Compare commits
No commits in common. "0688ed4dd1522dd3b230f910e227d5023f2b3af0" and "a6bb538ce94cb1f64e7408f3a4bd5c577c5ca403" have entirely different histories.
0688ed4dd1
...
a6bb538ce9
10 changed files with 152 additions and 145 deletions
|
@ -1,4 +1,11 @@
|
|||
# -*- mode: conf; -*-
|
||||
|
||||
[MESSAGES CONTROL]
|
||||
disable=fixme
|
||||
disable=
|
||||
attribute-defined-outside-init,
|
||||
fixme,
|
||||
import-outside-toplevel,
|
||||
too-many-arguments,
|
||||
too-many-branches,
|
||||
too-many-locals,
|
||||
too-many-positional-arguments,
|
||||
|
|
|
@ -25,12 +25,11 @@ WuttJamaican - app handler
|
|||
"""
|
||||
# pylint: disable=too-many-lines
|
||||
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import warnings
|
||||
import importlib
|
||||
from importlib.metadata import version
|
||||
|
||||
import humanize
|
||||
|
||||
|
@ -143,7 +142,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
return self.get_enum()
|
||||
|
||||
if name == "providers":
|
||||
self.__dict__["providers"] = self.get_all_providers()
|
||||
self.providers = self.get_all_providers()
|
||||
return self.providers
|
||||
|
||||
for provider in self.providers.values():
|
||||
|
@ -283,13 +282,9 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
pkgname = modpath.split(".")[0]
|
||||
|
||||
try:
|
||||
from importlib.metadata import ( # pylint: disable=import-outside-toplevel
|
||||
packages_distributions,
|
||||
)
|
||||
from importlib.metadata import packages_distributions
|
||||
except ImportError: # python < 3.10
|
||||
from importlib_metadata import ( # pylint: disable=import-outside-toplevel
|
||||
packages_distributions,
|
||||
)
|
||||
from importlib_metadata import packages_distributions
|
||||
|
||||
pkgmap = packages_distributions()
|
||||
if pkgname in pkgmap:
|
||||
|
@ -311,6 +306,8 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
|
||||
:returns: Version as string.
|
||||
"""
|
||||
from importlib.metadata import version
|
||||
|
||||
if not dist:
|
||||
dist = self.get_distribution(obj=obj)
|
||||
if dist:
|
||||
|
@ -340,7 +337,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
usedb=False,
|
||||
default=self.default_model_spec,
|
||||
)
|
||||
self.__dict__["model"] = importlib.import_module(spec)
|
||||
self.model = importlib.import_module(spec)
|
||||
return self.model
|
||||
|
||||
def get_enum(self):
|
||||
|
@ -364,7 +361,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
spec = self.config.get(
|
||||
f"{self.appname}.enum_spec", usedb=False, default=self.default_enum_spec
|
||||
)
|
||||
self.__dict__["enum"] = importlib.import_module(spec)
|
||||
self.enum = importlib.import_module(spec)
|
||||
return self.enum
|
||||
|
||||
def load_object(self, spec):
|
||||
|
@ -499,7 +496,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
|
||||
:returns: SQLAlchemy session for the app DB.
|
||||
"""
|
||||
from .db import Session # pylint: disable=import-outside-toplevel
|
||||
from .db import Session
|
||||
|
||||
return Session(**kwargs)
|
||||
|
||||
|
@ -585,7 +582,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
associated. Simple convenience wrapper around
|
||||
:func:`sqlalchemy:sqlalchemy.orm.object_session()`.
|
||||
"""
|
||||
from sqlalchemy import orm # pylint: disable=import-outside-toplevel
|
||||
from sqlalchemy import orm
|
||||
|
||||
return orm.object_session(obj)
|
||||
|
||||
|
@ -600,7 +597,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
this method will provide a default factory in the form of
|
||||
:meth:`make_session`.
|
||||
"""
|
||||
from .db import short_session # pylint: disable=import-outside-toplevel
|
||||
from .db import short_session
|
||||
|
||||
if "factory" not in kwargs and "config" not in kwargs:
|
||||
kwargs["factory"] = self.make_session
|
||||
|
@ -628,7 +625,7 @@ class AppHandler: # pylint: disable=too-many-public-methods
|
|||
|
||||
:returns: Setting value as string, or ``None``.
|
||||
"""
|
||||
from .db import get_setting # pylint: disable=import-outside-toplevel
|
||||
from .db import get_setting
|
||||
|
||||
return get_setting(session, name)
|
||||
|
||||
|
@ -1117,6 +1114,7 @@ class AppProvider: # pylint: disable=too-few-public-methods
|
|||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
|
||||
if isinstance(config, AppHandler):
|
||||
warnings.warn(
|
||||
"passing app handler to app provider is deprecated; "
|
||||
|
@ -1154,7 +1152,6 @@ class GenericHandler:
|
|||
self.config = config
|
||||
self.app = self.config.get_app()
|
||||
self.modules = {}
|
||||
self.classes = {}
|
||||
|
||||
@property
|
||||
def appname(self):
|
||||
|
|
|
@ -108,7 +108,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
:returns: :class:`~wuttjamaican.db.model.auth.User` instance,
|
||||
or ``None``.
|
||||
"""
|
||||
from sqlalchemy import orm # pylint: disable=import-outside-toplevel
|
||||
from sqlalchemy import orm
|
||||
|
||||
model = self.app.model
|
||||
|
||||
|
@ -170,6 +170,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
return role
|
||||
|
||||
else: # assuming it is a string
|
||||
|
||||
# try to match on Role.uuid
|
||||
try:
|
||||
role = session.get(model.Role, _uuid.UUID(key))
|
||||
|
@ -225,6 +226,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
|
||||
# nb. these lookups require a db session
|
||||
if session:
|
||||
|
||||
# or maybe it is a uuid
|
||||
if isinstance(obj, _uuid.UUID):
|
||||
user = session.get(model.User, obj)
|
||||
|
@ -233,6 +235,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
|
||||
# or maybe it is a string
|
||||
elif isinstance(obj, str):
|
||||
|
||||
# try to match on User.uuid
|
||||
try:
|
||||
user = session.get(model.User, _uuid.UUID(obj))
|
||||
|
@ -512,7 +515,7 @@ class AuthHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
|
||||
return cache
|
||||
|
||||
def has_permission( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def has_permission(
|
||||
self,
|
||||
session,
|
||||
principal,
|
||||
|
|
|
@ -189,11 +189,10 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
See also :ref:`where-config-settings-come-from`.
|
||||
"""
|
||||
|
||||
_app = None
|
||||
default_app_handler_spec = "wuttjamaican.app:AppHandler"
|
||||
default_engine_maker_spec = "wuttjamaican.db.conf:make_engine_from_config"
|
||||
|
||||
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def __init__(
|
||||
self,
|
||||
files=None,
|
||||
defaults=None,
|
||||
|
@ -241,10 +240,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
|
||||
# configure main app DB if applicable, or disable usedb flag
|
||||
try:
|
||||
from wuttjamaican.db import ( # pylint: disable=import-outside-toplevel
|
||||
Session,
|
||||
get_engines,
|
||||
)
|
||||
from wuttjamaican.db import Session, get_engines
|
||||
except ImportError:
|
||||
if self.usedb:
|
||||
log.warning(
|
||||
|
@ -276,8 +272,20 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
raise FileNotFoundError(f"could not read required config file: {path}")
|
||||
return
|
||||
|
||||
# write config to temp file
|
||||
temp_path = self._write_temp_config_file(config)
|
||||
# load all values into (yet another) temp config
|
||||
temp_config = configparser.RawConfigParser()
|
||||
for section in config.sections():
|
||||
temp_config.add_section(section)
|
||||
# nb. must interpolate most values but *not* for logging formatters
|
||||
raw = section.startswith("formatter_")
|
||||
for option in config.options(section):
|
||||
temp_config.set(section, option, config.get(section, option, raw=raw))
|
||||
|
||||
# re-write as temp file with "final" values
|
||||
fd, temp_path = tempfile.mkstemp(suffix=".ini")
|
||||
os.close(fd)
|
||||
with open(temp_path, "wt", encoding="utf_8") as f:
|
||||
temp_config.write(f)
|
||||
|
||||
# and finally, load that into our main config
|
||||
config = configuration.config_from_ini(temp_path, read_from_file=True)
|
||||
|
@ -297,24 +305,6 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
for p in self.parse_list(includes):
|
||||
self._load_ini_configs(p, configs, require=False)
|
||||
|
||||
def _write_temp_config_file(self, config):
|
||||
# load all values into (yet another) temp config
|
||||
temp_config = configparser.RawConfigParser()
|
||||
for section in config.sections():
|
||||
temp_config.add_section(section)
|
||||
# nb. must interpolate most values but *not* for logging formatters
|
||||
raw = section.startswith("formatter_")
|
||||
for option in config.options(section):
|
||||
temp_config.set(section, option, config.get(section, option, raw=raw))
|
||||
|
||||
# re-write as temp file with "final" values
|
||||
fd, temp_path = tempfile.mkstemp(suffix=".ini")
|
||||
os.close(fd)
|
||||
with open(temp_path, "wt", encoding="utf_8") as f:
|
||||
temp_config.write(f)
|
||||
|
||||
return temp_path
|
||||
|
||||
def get_prioritized_files(self):
|
||||
"""
|
||||
Returns list of config files in order of priority.
|
||||
|
@ -343,7 +333,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
# get current value, sans db
|
||||
return self.get(key, usedb=False)
|
||||
|
||||
def get( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def get(
|
||||
self,
|
||||
key,
|
||||
default=UNSPECIFIED,
|
||||
|
@ -458,6 +448,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
# read from defaults + INI files
|
||||
value = self.configuration.get(key)
|
||||
if value is not None:
|
||||
|
||||
# nb. if the "value" corresponding to the given key is in
|
||||
# fact a subset/dict of more config values, then we must
|
||||
# "ignore" that. so only return the value if it is *not*
|
||||
|
@ -620,6 +611,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
os.remove(path)
|
||||
|
||||
def _write_logging_config_file(self):
|
||||
|
||||
# load all current values into configparser
|
||||
parser = configparser.RawConfigParser()
|
||||
for section, values in self.configuration.items():
|
||||
|
@ -641,7 +633,7 @@ class WuttaConfig: # pylint: disable=too-many-instance-attributes
|
|||
|
||||
See also :doc:`/narr/handlers/app`.
|
||||
"""
|
||||
if not self._app:
|
||||
if not hasattr(self, "_app"):
|
||||
spec = self.get(
|
||||
f"{self.appname}.app.handler",
|
||||
usedb=False,
|
||||
|
@ -718,10 +710,7 @@ def generic_default_files(appname):
|
|||
if sys.platform == "win32":
|
||||
# use pywin32 to fetch official defaults
|
||||
try:
|
||||
from win32com.shell import ( # pylint: disable=import-outside-toplevel
|
||||
shell,
|
||||
shellcon,
|
||||
)
|
||||
from win32com.shell import shell, shellcon
|
||||
except ImportError:
|
||||
return []
|
||||
|
||||
|
@ -760,7 +749,7 @@ def generic_default_files(appname):
|
|||
]
|
||||
|
||||
|
||||
def get_config_paths( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def get_config_paths(
|
||||
files=None,
|
||||
plus_files=None,
|
||||
appname="wutta",
|
||||
|
@ -884,7 +873,28 @@ def get_config_paths( # pylint: disable=too-many-arguments,too-many-positional-
|
|||
|
||||
# first identify any "primary" config files
|
||||
if files is None:
|
||||
files = _get_primary_config_files(appname, env, env_files_name, default_files)
|
||||
if not env_files_name:
|
||||
env_files_name = f"{appname.upper()}_CONFIG_FILES"
|
||||
|
||||
files = env.get(env_files_name)
|
||||
if files is not None:
|
||||
files = files.split(os.pathsep)
|
||||
|
||||
elif default_files:
|
||||
if callable(default_files):
|
||||
files = default_files(appname) or []
|
||||
elif isinstance(default_files, str):
|
||||
files = [default_files]
|
||||
else:
|
||||
files = list(default_files)
|
||||
files = [path for path in files if os.path.exists(path)]
|
||||
|
||||
else:
|
||||
files = []
|
||||
for path in generic_default_files(appname):
|
||||
if os.path.exists(path):
|
||||
files.append(path)
|
||||
|
||||
elif isinstance(files, str):
|
||||
files = [files]
|
||||
else:
|
||||
|
@ -916,48 +926,19 @@ def get_config_paths( # pylint: disable=too-many-arguments,too-many-positional-
|
|||
# which config file as part of the actual service definition in
|
||||
# windows, so the service name is used for magic lookup here.
|
||||
if winsvc:
|
||||
files = _get_winsvc_config_files(appname, winsvc, files)
|
||||
config = configparser.ConfigParser()
|
||||
config.read(files)
|
||||
section = f"{appname}.config"
|
||||
if config.has_section(section):
|
||||
option = f"winsvc.{winsvc}"
|
||||
if config.has_option(section, option):
|
||||
# replace file paths with whatever config value says
|
||||
files = parse_list(config.get(section, option))
|
||||
|
||||
return files
|
||||
|
||||
|
||||
def _get_primary_config_files(appname, env, env_files_name, default_files):
|
||||
if not env_files_name:
|
||||
env_files_name = f"{appname.upper()}_CONFIG_FILES"
|
||||
|
||||
files = env.get(env_files_name)
|
||||
if files is not None:
|
||||
return files.split(os.pathsep)
|
||||
|
||||
if default_files:
|
||||
if callable(default_files):
|
||||
files = default_files(appname) or []
|
||||
elif isinstance(default_files, str):
|
||||
files = [default_files]
|
||||
else:
|
||||
files = list(default_files)
|
||||
return [path for path in files if os.path.exists(path)]
|
||||
|
||||
files = []
|
||||
for path in generic_default_files(appname):
|
||||
if os.path.exists(path):
|
||||
files.append(path)
|
||||
return files
|
||||
|
||||
|
||||
def _get_winsvc_config_files(appname, winsvc, files):
|
||||
config = configparser.ConfigParser()
|
||||
config.read(files)
|
||||
section = f"{appname}.config"
|
||||
if config.has_section(section):
|
||||
option = f"winsvc.{winsvc}"
|
||||
if config.has_option(section, option):
|
||||
# replace file paths with whatever config value says
|
||||
files = parse_list(config.get(section, option))
|
||||
return files
|
||||
|
||||
|
||||
def make_config( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
|
||||
def make_config(
|
||||
files=None,
|
||||
plus_files=None,
|
||||
appname="wutta",
|
||||
|
|
|
@ -172,7 +172,7 @@ class Message: # pylint: disable=too-many-instance-attributes
|
|||
List of file attachments for the message.
|
||||
"""
|
||||
|
||||
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def __init__(
|
||||
self,
|
||||
key=None,
|
||||
sender=None,
|
||||
|
@ -277,6 +277,7 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
# prefer configured list of template lookup paths, if set
|
||||
templates = self.config.get_list(f"{self.config.appname}.email.templates")
|
||||
if not templates:
|
||||
|
||||
# otherwise use all available paths, from app providers
|
||||
available = []
|
||||
for provider in self.app.providers.values():
|
||||
|
@ -323,8 +324,8 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
This calls :meth:`get_email_modules()` and for each module, it
|
||||
discovers all the email settings it contains.
|
||||
"""
|
||||
if "email_settings" not in self.classes:
|
||||
self.classes["email_settings"] = {}
|
||||
if not hasattr(self, "_email_settings"):
|
||||
self._email_settings = {}
|
||||
for module in self.get_email_modules():
|
||||
for name in dir(module):
|
||||
obj = getattr(module, name)
|
||||
|
@ -333,9 +334,9 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
and obj is not EmailSetting
|
||||
and issubclass(obj, EmailSetting)
|
||||
):
|
||||
self.classes["email_settings"][obj.__name__] = obj
|
||||
self._email_settings[obj.__name__] = obj
|
||||
|
||||
return self.classes["email_settings"]
|
||||
return self._email_settings
|
||||
|
||||
def get_email_setting(self, key, instance=True):
|
||||
"""
|
||||
|
@ -454,7 +455,7 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
# fall back to global default, if present
|
||||
return self.config.get(f"{self.config.appname}.email.default.replyto")
|
||||
|
||||
def get_auto_subject( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def get_auto_subject(
|
||||
self, key, context=None, rendered=True, setting=None, default=None
|
||||
):
|
||||
"""
|
||||
|
@ -755,7 +756,7 @@ class EmailHandler(GenericHandler): # pylint: disable=too-many-public-methods
|
|||
f"{self.config.appname}.mail.send_emails", default=False
|
||||
)
|
||||
|
||||
def send_email( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def send_email(
|
||||
self, key=None, context=None, message=None, sender=None, recips=None, **kwargs
|
||||
):
|
||||
"""
|
||||
|
|
|
@ -85,7 +85,6 @@ class InstallHandler(GenericHandler):
|
|||
app_title = None
|
||||
pypi_name = None
|
||||
egg_name = None
|
||||
schema_installed = False
|
||||
|
||||
def __init__(self, config, **kwargs):
|
||||
super().__init__(config)
|
||||
|
@ -101,6 +100,24 @@ class InstallHandler(GenericHandler):
|
|||
if not self.egg_name:
|
||||
self.egg_name = self.pypi_name.replace("-", "_")
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Run the interactive command-line installer.
|
||||
|
||||
This does the following:
|
||||
|
||||
* check for ``prompt_toolkit`` and maybe ask to install it
|
||||
* define the template lookup paths, for making config files
|
||||
* call :meth:`show_welcome()`
|
||||
* call :meth:`sanity_check()`
|
||||
* call :meth:`do_install_steps()`
|
||||
* call :meth:`show_goodbye()`
|
||||
|
||||
Although if a problem is encountered then not all calls may
|
||||
happen.
|
||||
"""
|
||||
self.require_prompt_toolkit()
|
||||
|
||||
paths = [
|
||||
self.app.resource_path("wuttjamaican:templates/install"),
|
||||
]
|
||||
|
@ -114,22 +131,6 @@ class InstallHandler(GenericHandler):
|
|||
|
||||
self.templates = TemplateLookup(directories=paths)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Run the interactive command-line installer.
|
||||
|
||||
This does the following:
|
||||
|
||||
* check for ``prompt_toolkit`` and maybe ask to install it
|
||||
* call :meth:`show_welcome()`
|
||||
* call :meth:`sanity_check()`
|
||||
* call :meth:`do_install_steps()`
|
||||
* call :meth:`show_goodbye()`
|
||||
|
||||
Although if a problem is encountered then not all calls may
|
||||
happen.
|
||||
"""
|
||||
self.require_prompt_toolkit()
|
||||
self.show_welcome()
|
||||
self.sanity_check()
|
||||
self.schema_installed = False
|
||||
|
@ -237,9 +238,9 @@ class InstallHandler(GenericHandler):
|
|||
|
||||
def make_db_url(
|
||||
self, dbtype, dbhost, dbport, dbname, dbuser, dbpass
|
||||
): # pylint: disable=empty-docstring,too-many-arguments,too-many-positional-arguments
|
||||
): # pylint: disable=empty-docstring
|
||||
""" """
|
||||
from sqlalchemy.engine import URL # pylint: disable=import-outside-toplevel
|
||||
from sqlalchemy.engine import URL
|
||||
|
||||
if dbtype == "mysql":
|
||||
drivername = "mysql+mysqlconnector"
|
||||
|
@ -257,7 +258,7 @@ class InstallHandler(GenericHandler):
|
|||
|
||||
def test_db_connection(self, url): # pylint: disable=empty-docstring
|
||||
""" """
|
||||
import sqlalchemy as sa # pylint: disable=import-outside-toplevel
|
||||
import sqlalchemy as sa
|
||||
|
||||
engine = sa.create_engine(url)
|
||||
|
||||
|
@ -441,9 +442,7 @@ class InstallHandler(GenericHandler):
|
|||
:param db_url: :class:`sqlalchemy:sqlalchemy.engine.URL`
|
||||
instance.
|
||||
"""
|
||||
from alembic.util.messaging import ( # pylint: disable=import-outside-toplevel
|
||||
obfuscate_url_pw,
|
||||
)
|
||||
from alembic.util.messaging import obfuscate_url_pw
|
||||
|
||||
if not self.prompt_bool("install db schema?", True):
|
||||
return False
|
||||
|
@ -489,7 +488,7 @@ class InstallHandler(GenericHandler):
|
|||
def require_prompt_toolkit(self, answer=None): # pylint: disable=empty-docstring
|
||||
""" """
|
||||
try:
|
||||
import prompt_toolkit # pylint: disable=unused-import,import-outside-toplevel
|
||||
import prompt_toolkit # pylint: disable=unused-import
|
||||
except ImportError:
|
||||
value = answer or input(
|
||||
"\nprompt_toolkit is not installed. shall i install it? [Yn] "
|
||||
|
@ -504,7 +503,7 @@ class InstallHandler(GenericHandler):
|
|||
)
|
||||
|
||||
# nb. this should now succeed
|
||||
import prompt_toolkit # pylint: disable=import-outside-toplevel
|
||||
import prompt_toolkit
|
||||
|
||||
def rprint(self, *args, **kwargs):
|
||||
"""
|
||||
|
@ -514,9 +513,7 @@ class InstallHandler(GenericHandler):
|
|||
|
||||
def get_prompt_style(self): # pylint: disable=empty-docstring
|
||||
""" """
|
||||
from prompt_toolkit.styles import ( # pylint: disable=import-outside-toplevel
|
||||
Style,
|
||||
)
|
||||
from prompt_toolkit.styles import Style
|
||||
|
||||
# message formatting styles
|
||||
return Style.from_dict(
|
||||
|
@ -526,7 +523,7 @@ class InstallHandler(GenericHandler):
|
|||
}
|
||||
)
|
||||
|
||||
def prompt_generic( # pylint: disable=too-many-arguments,too-many-positional-arguments
|
||||
def prompt_generic(
|
||||
self,
|
||||
info,
|
||||
default=None,
|
||||
|
@ -557,7 +554,7 @@ class InstallHandler(GenericHandler):
|
|||
unless ``is_bool`` was requested in which case ``True`` or
|
||||
``False``.
|
||||
"""
|
||||
from prompt_toolkit import prompt # pylint: disable=import-outside-toplevel
|
||||
from prompt_toolkit import prompt
|
||||
|
||||
# build prompt message
|
||||
message = [
|
||||
|
|
|
@ -157,8 +157,8 @@ class ReportHandler(GenericHandler):
|
|||
This calls :meth:`get_report_modules()` and for each module,
|
||||
it discovers all the reports it contains.
|
||||
"""
|
||||
if "reports" not in self.classes:
|
||||
self.classes["reports"] = {}
|
||||
if not hasattr(self, "_reports"):
|
||||
self._reports = {}
|
||||
for module in self.get_report_modules():
|
||||
for name in dir(module):
|
||||
obj = getattr(module, name)
|
||||
|
@ -167,9 +167,9 @@ class ReportHandler(GenericHandler):
|
|||
and obj is not Report
|
||||
and issubclass(obj, Report)
|
||||
):
|
||||
self.classes["reports"][obj.report_key] = obj
|
||||
self._reports[obj.report_key] = obj
|
||||
|
||||
return self.classes["reports"]
|
||||
return self._reports
|
||||
|
||||
def get_report(self, key, instance=True):
|
||||
"""
|
||||
|
|
|
@ -101,9 +101,9 @@ def load_entry_points(group, ignore_errors=False):
|
|||
|
||||
try:
|
||||
# nb. this package was added in python 3.8
|
||||
import importlib.metadata as importlib_metadata # pylint: disable=import-outside-toplevel
|
||||
import importlib.metadata as importlib_metadata
|
||||
except ImportError:
|
||||
import importlib_metadata # pylint: disable=import-outside-toplevel
|
||||
import importlib_metadata
|
||||
|
||||
eps = importlib_metadata.entry_points()
|
||||
if not hasattr(eps, "select"):
|
||||
|
@ -306,7 +306,9 @@ def progress_loop(func, items, factory, message=None):
|
|||
progress = factory(message, count)
|
||||
|
||||
for i, item in enumerate(items, 1):
|
||||
|
||||
func(item, i)
|
||||
|
||||
if progress:
|
||||
progress.update(i)
|
||||
|
||||
|
@ -341,17 +343,12 @@ def resource_path(path):
|
|||
:returns: Absolute file path to the resource.
|
||||
"""
|
||||
if not os.path.isabs(path) and ":" in path:
|
||||
|
||||
try:
|
||||
# nb. these were added in python 3.9
|
||||
from importlib.resources import ( # pylint: disable=import-outside-toplevel
|
||||
files,
|
||||
as_file,
|
||||
)
|
||||
from importlib.resources import files, as_file
|
||||
except ImportError: # python < 3.9
|
||||
from importlib_resources import ( # pylint: disable=import-outside-toplevel
|
||||
files,
|
||||
as_file,
|
||||
)
|
||||
from importlib_resources import files, as_file
|
||||
|
||||
package, filename = path.split(":")
|
||||
ref = files(package) / filename
|
||||
|
|
|
@ -17,6 +17,7 @@ from wuttjamaican.testing import FileTestCase, ConfigTestCase
|
|||
|
||||
|
||||
class TestWuttaConfig(FileTestCase):
|
||||
|
||||
def make_config(self, **kwargs):
|
||||
return mod.WuttaConfig(**kwargs)
|
||||
|
||||
|
@ -290,6 +291,7 @@ configure_logging = true
|
|||
)
|
||||
|
||||
with patch.object(conf.WuttaConfig, "_configure_logging") as method:
|
||||
|
||||
# no logging config by default
|
||||
config = conf.WuttaConfig()
|
||||
method.assert_not_called()
|
||||
|
@ -317,6 +319,7 @@ configure_logging = true
|
|||
)
|
||||
|
||||
with patch("wuttjamaican.conf.logging") as logging:
|
||||
|
||||
# basic constructor attempts logging config
|
||||
config = conf.WuttaConfig(configure_logging=True)
|
||||
logging.config.fileConfig.assert_called_once()
|
||||
|
@ -335,14 +338,14 @@ configure_logging = true
|
|||
logging.config.fileConfig.assert_called_once()
|
||||
|
||||
def test_config_has_no_app_after_init(self):
|
||||
|
||||
# initial config should *not* have an app yet, otherwise
|
||||
# extensions cannot specify a default app handler
|
||||
config = conf.WuttaConfig()
|
||||
self.assertIsNone(config._app)
|
||||
self.assertFalse(hasattr(config, "_app"))
|
||||
|
||||
# but after that we can get an app okay
|
||||
app = config.get_app()
|
||||
self.assertIsNotNone(app)
|
||||
self.assertIs(app, config._app)
|
||||
|
||||
def test_setdefault(self):
|
||||
|
@ -625,6 +628,7 @@ configure_logging = true
|
|||
self.assertEqual(value[2], "baz")
|
||||
|
||||
def test_get_app(self):
|
||||
|
||||
# default handler
|
||||
config = conf.WuttaConfig()
|
||||
self.assertEqual(config.default_app_handler_spec, "wuttjamaican.app:AppHandler")
|
||||
|
@ -682,6 +686,7 @@ def custom_make_engine_from_config():
|
|||
|
||||
|
||||
class TestWuttaConfigExtension(TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
config = conf.WuttaConfig()
|
||||
ext = conf.WuttaConfigExtension()
|
||||
|
@ -690,6 +695,7 @@ class TestWuttaConfigExtension(TestCase):
|
|||
|
||||
|
||||
class TestGenericDefaultFiles(TestCase):
|
||||
|
||||
def test_linux(self):
|
||||
files = conf.generic_default_files("wuttatest")
|
||||
self.assertIsInstance(files, list)
|
||||
|
@ -701,6 +707,7 @@ class TestGenericDefaultFiles(TestCase):
|
|||
win32com.shell.SHGetSpecialFolderPath.return_value = r"C:" + os.sep
|
||||
with patch.dict("sys.modules", **{"win32com.shell": win32com}):
|
||||
with patch("wuttjamaican.conf.sys", platform="win32"):
|
||||
|
||||
files = conf.generic_default_files("wuttatest")
|
||||
self.assertIsInstance(files, list)
|
||||
self.assertTrue(len(files) > 1)
|
||||
|
@ -716,12 +723,14 @@ class TestGenericDefaultFiles(TestCase):
|
|||
|
||||
with patch("builtins.__import__", side_effect=mock_import):
|
||||
with patch("wuttjamaican.conf.sys", platform="win32"):
|
||||
|
||||
files = conf.generic_default_files("wuttatest")
|
||||
self.assertIsInstance(files, list)
|
||||
self.assertEqual(len(files), 0)
|
||||
|
||||
|
||||
class TestGetConfigPaths(FileTestCase):
|
||||
|
||||
def test_winsvc(self):
|
||||
myconf = self.write_file(
|
||||
"my.conf",
|
||||
|
@ -744,6 +753,7 @@ winsvc.RattailFileMonitor = /path/to/other/file
|
|||
|
||||
|
||||
class TestMakeConfig(FileTestCase):
|
||||
|
||||
# nb. we use appname='wuttatest' in this suite to avoid any
|
||||
# "valid" default config files, env vars etc. which may be present
|
||||
# on the dev machine
|
||||
|
@ -753,6 +763,7 @@ class TestMakeConfig(FileTestCase):
|
|||
|
||||
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
|
||||
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
|
||||
|
||||
# generic files are used if nothing is specified
|
||||
generic_default_files.return_value = [generic]
|
||||
config = conf.make_config(appname="wuttatest")
|
||||
|
@ -777,6 +788,7 @@ class TestMakeConfig(FileTestCase):
|
|||
|
||||
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
|
||||
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
|
||||
|
||||
# generic defaults are used if nothing specified
|
||||
generic_default_files.return_value = [generic]
|
||||
config = conf.make_config(appname="wuttatest")
|
||||
|
@ -820,6 +832,7 @@ class TestMakeConfig(FileTestCase):
|
|||
|
||||
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
|
||||
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
|
||||
|
||||
generic_default_files.return_value = [generic]
|
||||
|
||||
# no plus files by default
|
||||
|
@ -864,6 +877,7 @@ class TestMakeConfig(FileTestCase):
|
|||
|
||||
with patch("wuttjamaican.conf.generic_default_files") as generic_default_files:
|
||||
with patch("wuttjamaican.conf.WuttaConfig") as WuttaConfig:
|
||||
|
||||
generic_default_files.return_value = [generic]
|
||||
|
||||
# generic files by default
|
||||
|
@ -908,6 +922,7 @@ class TestMakeConfig(FileTestCase):
|
|||
|
||||
with patch.object(mod, "WuttaConfig") as WuttaConfig:
|
||||
with patch.object(mod, "load_entry_points") as load_entry_points:
|
||||
|
||||
# no entry points loaded if extend=False
|
||||
config = mod.make_config(appname="wuttatest", extend=False)
|
||||
WuttaConfig.assert_called_once_with(
|
||||
|
@ -952,6 +967,7 @@ class TestMakeConfig(FileTestCase):
|
|||
|
||||
|
||||
class TestWuttaConfigProfile(ConfigTestCase):
|
||||
|
||||
def make_profile(self, key):
|
||||
return mod.WuttaConfigProfile(self.config, key)
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ from wuttjamaican.testing import ConfigTestCase
|
|||
|
||||
|
||||
class TestInstallHandler(ConfigTestCase):
|
||||
|
||||
def make_handler(self, **kwargs):
|
||||
return mod.InstallHandler(self.config, **kwargs)
|
||||
|
||||
|
@ -37,6 +38,7 @@ class TestInstallHandler(ConfigTestCase):
|
|||
with patch.object(mod, "sys") as sys:
|
||||
with patch.object(handler, "rprint") as rprint:
|
||||
with patch.object(handler, "prompt_bool") as prompt_bool:
|
||||
|
||||
# user continues
|
||||
prompt_bool.return_value = True
|
||||
handler.show_welcome()
|
||||
|
@ -52,6 +54,7 @@ class TestInstallHandler(ConfigTestCase):
|
|||
with patch.object(mod, "sys") as sys:
|
||||
with patch.object(mod, "os") as os:
|
||||
with patch.object(handler, "rprint") as rprint:
|
||||
|
||||
# pretend appdir does not exist
|
||||
os.path.exists.return_value = False
|
||||
handler.sanity_check()
|
||||
|
@ -76,9 +79,10 @@ class TestInstallHandler(ConfigTestCase):
|
|||
with patch.object(handler, "get_dbinfo", return_value=dbinfo):
|
||||
with patch.object(handler, "make_appdir") as make_appdir:
|
||||
with patch.object(handler, "install_db_schema") as install_db_schema:
|
||||
|
||||
# nb. just for sanity/coverage
|
||||
install_db_schema.return_value = True
|
||||
self.assertFalse(handler.schema_installed)
|
||||
self.assertFalse(hasattr(handler, "schema_installed"))
|
||||
handler.do_install_steps()
|
||||
self.assertTrue(make_appdir.called)
|
||||
self.assertTrue(handler.schema_installed)
|
||||
|
@ -105,6 +109,7 @@ class TestInstallHandler(ConfigTestCase):
|
|||
with patch.object(handler, "prompt_generic", side_effect=prompt_generic):
|
||||
with patch.object(handler, "test_db_connection") as test_db_connection:
|
||||
with patch.object(handler, "rprint") as rprint:
|
||||
|
||||
# bad dbinfo
|
||||
test_db_connection.return_value = "bad dbinfo"
|
||||
sys.exit.side_effect = RuntimeError
|
||||
|
@ -215,6 +220,7 @@ default.url = {db_url}
|
|||
db_url = sa.create_engine(db_url).url
|
||||
|
||||
with patch.object(mod, "subprocess") as subprocess:
|
||||
|
||||
# user declines offer to install schema
|
||||
with patch.object(handler, "prompt_bool", return_value=False):
|
||||
self.assertFalse(handler.install_db_schema(db_url, appdir=self.tempdir))
|
||||
|
@ -315,6 +321,7 @@ default.url = {db_url}
|
|||
with patch("builtins.__import__", side_effect=mock_import):
|
||||
with patch.object(handler, "get_prompt_style", return_value=style):
|
||||
with patch.object(handler, "rprint") as rprint:
|
||||
|
||||
# no input or default value
|
||||
mock_prompt.return_value = ""
|
||||
result = handler.prompt_generic("foo")
|
||||
|
@ -445,6 +452,7 @@ default.url = {db_url}
|
|||
|
||||
with patch("builtins.__import__", side_effect=mock_import):
|
||||
with patch.object(handler, "rprint") as rprint:
|
||||
|
||||
# no default; true input
|
||||
mock_prompt.reset_mock()
|
||||
mock_prompt.return_value = "Y"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue