From 8be1b66c9e6a68c38b5279f2ded2086d2b140a81 Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Sun, 31 Aug 2025 12:47:46 -0500 Subject: [PATCH] fix: format all code with black and from now on should not deviate from that... --- docs/conf.py | 30 +++--- src/wuttatell/_version.py | 2 +- src/wuttatell/app.py | 8 +- src/wuttatell/cli/tell.py | 32 +++--- src/wuttatell/client.py | 52 +++++---- src/wuttatell/telemetry.py | 63 +++++------ tasks.py | 10 +- tests/cli/test_tell.py | 4 +- tests/test_client.py | 182 +++++++++++++++++++------------- tests/test_telemetry.py | 209 ++++++++++++++++++++----------------- 10 files changed, 334 insertions(+), 258 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index d65a6ba..3a48ab6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -8,33 +8,33 @@ from importlib.metadata import version as get_version -project = 'WuttaTell' -copyright = '2025, Lance Edgar' -author = 'Lance Edgar' -release = get_version('WuttaTell') +project = "WuttaTell" +copyright = "2025, Lance Edgar" +author = "Lance Edgar" +release = get_version("WuttaTell") # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.intersphinx', - 'sphinx.ext.viewcode', - 'sphinx.ext.todo', - 'sphinxcontrib.programoutput', + "sphinx.ext.autodoc", + "sphinx.ext.intersphinx", + "sphinx.ext.viewcode", + "sphinx.ext.todo", + "sphinxcontrib.programoutput", ] -templates_path = ['_templates'] -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] +templates_path = ["_templates"] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] intersphinx_mapping = { - 'requests': ('https://requests.readthedocs.io/en/latest/', None), - 'wuttjamaican': ('https://docs.wuttaproject.org/wuttjamaican/', None), + "requests": ("https://requests.readthedocs.io/en/latest/", None), + "wuttjamaican": ("https://docs.wuttaproject.org/wuttjamaican/", None), } # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output -html_theme = 'furo' -html_static_path = ['_static'] +html_theme = "furo" +html_static_path = ["_static"] diff --git a/src/wuttatell/_version.py b/src/wuttatell/_version.py index 3e4054f..1169b80 100644 --- a/src/wuttatell/_version.py +++ b/src/wuttatell/_version.py @@ -3,4 +3,4 @@ from importlib.metadata import version -__version__ = version('WuttaTell') +__version__ = version("WuttaTell") diff --git a/src/wuttatell/app.py b/src/wuttatell/app.py index af045dc..486fe16 100644 --- a/src/wuttatell/app.py +++ b/src/wuttatell/app.py @@ -41,9 +41,11 @@ class WuttaTellAppProvider(AppProvider): :rtype: :class:`~wuttatell.telemetry.TelemetryHandler` """ - if not hasattr(self, 'telemetry_handler'): - spec = self.config.get(f'{self.appname}.telemetry.handler', - default='wuttatell.telemetry:TelemetryHandler') + if not hasattr(self, "telemetry_handler"): + spec = self.config.get( + f"{self.appname}.telemetry.handler", + default="wuttatell.telemetry:TelemetryHandler", + ) factory = self.app.load_object(spec) self.telemetry_handler = factory(self.config, **kwargs) return self.telemetry_handler diff --git a/src/wuttatell/cli/tell.py b/src/wuttatell/cli/tell.py index 1cbe1f6..c540833 100644 --- a/src/wuttatell/cli/tell.py +++ b/src/wuttatell/cli/tell.py @@ -37,18 +37,24 @@ log = logging.getLogger(__name__) @wutta_typer.command() def tell( - ctx: typer.Context, - profile: Annotated[ - str, - typer.Option('--profile', '-p', - help="Profile (type) of telemetry data to collect. " - "This also determines where/how data is submitted. " - "If not specified, default profile is assumed.")] = None, - dry_run: Annotated[ - bool, - typer.Option('--dry-run', - help="Go through all the motions but do not submit " - "the data to server.")] = False, + ctx: typer.Context, + profile: Annotated[ + str, + typer.Option( + "--profile", + "-p", + help="Profile (type) of telemetry data to collect. " + "This also determines where/how data is submitted. " + "If not specified, default profile is assumed.", + ), + ] = None, + dry_run: Annotated[ + bool, + typer.Option( + "--dry-run", + help="Go through all the motions but do not submit " "the data to server.", + ), + ] = False, ): """ Collect and submit telemetry data @@ -58,7 +64,7 @@ def tell( telemetry = app.get_telemetry_handler() data = telemetry.collect_all_data(profile=profile) - log.info("data collected for: %s", ', '.join(sorted(data))) + log.info("data collected for: %s", ", ".join(sorted(data))) log.debug("%s", data) if dry_run: diff --git a/src/wuttatell/client.py b/src/wuttatell/client.py index 1a77496..0b336ce 100644 --- a/src/wuttatell/client.py +++ b/src/wuttatell/client.py @@ -71,23 +71,30 @@ class SimpleAPIClient: API requests. """ - def __init__(self, config, base_url=None, token=None, ssl_verify=None, max_retries=None): + def __init__( + self, config, base_url=None, token=None, ssl_verify=None, max_retries=None + ): self.config = config - self.base_url = base_url or self.config.require(f'{self.config.appname}.api.base_url') - self.base_url = self.base_url.rstrip('/') - self.token = token or self.config.require(f'{self.config.appname}.api.token') + self.base_url = base_url or self.config.require( + f"{self.config.appname}.api.base_url" + ) + self.base_url = self.base_url.rstrip("/") + self.token = token or self.config.require(f"{self.config.appname}.api.token") if max_retries is not None: self.max_retries = max_retries else: - self.max_retries = self.config.get_int(f'{self.config.appname}.api.max_retries') + self.max_retries = self.config.get_int( + f"{self.config.appname}.api.max_retries" + ) if ssl_verify is not None: self.ssl_verify = ssl_verify else: - self.ssl_verify = self.config.get_bool(f'{self.config.appname}.api.ssl_verify', - default=True) + self.ssl_verify = self.config.get_bool( + f"{self.config.appname}.api.ssl_verify", default=True + ) self.session = None @@ -122,14 +129,18 @@ class SimpleAPIClient: # without it, can get error response: # 400 Client Error: Bad CSRF Origin for url parts = urlparse(self.base_url) - self.session.headers.update({ - 'Origin': f'{parts.scheme}://{parts.netloc}', - }) + self.session.headers.update( + { + "Origin": f"{parts.scheme}://{parts.netloc}", + } + ) # authenticate via token only (for now?) - self.session.headers.update({ - 'Authorization': f'Bearer {self.token}', - }) + self.session.headers.update( + { + "Authorization": f"Bearer {self.token}", + } + ) def make_request(self, request_method, api_method, params=None, data=None): """ @@ -153,13 +164,12 @@ class SimpleAPIClient: :rtype: :class:`requests:requests.Response` instance. """ self.init_session() - api_method = api_method.lstrip('/') - url = f'{self.base_url}/{api_method}' - if request_method == 'GET': + api_method = api_method.lstrip("/") + url = f"{self.base_url}/{api_method}" + if request_method == "GET": response = self.session.get(url, params=params) - elif request_method == 'POST': - response = self.session.post(url, params=params, - data=json.dumps(data)) + elif request_method == "POST": + response = self.session.post(url, params=params, data=json.dumps(data)) else: raise NotImplementedError(f"unsupported request method: {request_method}") response.raise_for_status() @@ -180,7 +190,7 @@ class SimpleAPIClient: :rtype: :class:`requests:requests.Response` instance. """ - return self.make_request('GET', api_method, params=params) + return self.make_request("GET", api_method, params=params) def post(self, api_method, **kwargs): """ @@ -194,4 +204,4 @@ class SimpleAPIClient: :rtype: :class:`requests:requests.Response` instance. """ - return self.make_request('POST', api_method, **kwargs) + return self.make_request("POST", api_method, **kwargs) diff --git a/src/wuttatell/telemetry.py b/src/wuttatell/telemetry.py index 28b6b82..fb64214 100644 --- a/src/wuttatell/telemetry.py +++ b/src/wuttatell/telemetry.py @@ -49,7 +49,7 @@ class TelemetryHandler(GenericHandler): if isinstance(profile, TelemetryProfile): return profile - return TelemetryProfile(self.config, profile or 'default') + return TelemetryProfile(self.config, profile or "default") def collect_all_data(self, profile=None): """ @@ -76,7 +76,7 @@ class TelemetryHandler(GenericHandler): profile = self.get_profile(profile) for key in profile.collect_keys: - collector = getattr(self, f'collect_data_{key}') + collector = getattr(self, f"collect_data_{key}") data[key] = collector(profile=profile) self.normalize_errors(data) @@ -87,11 +87,11 @@ class TelemetryHandler(GenericHandler): all_errors = [] for key, value in data.items(): if value: - errors = value.pop('errors', None) + errors = value.pop("errors", None) if errors: all_errors.extend(errors) if all_errors: - data['errors'] = all_errors + data["errors"] = all_errors def collect_data_os(self, profile, **kwargs): """ @@ -119,40 +119,40 @@ class TelemetryHandler(GenericHandler): errors = [] # release - release_path = kwargs.get('release_path', '/etc/os-release') + release_path = kwargs.get("release_path", "/etc/os-release") try: - with open(release_path, 'rt') as f: + with open(release_path, "rt") as f: output = f.read() except: errors.append(f"Failed to read {release_path}") else: release = {} - pattern = re.compile(r'^([^=]+)=(.*)$') - for line in output.strip().split('\n'): + pattern = re.compile(r"^([^=]+)=(.*)$") + for line in output.strip().split("\n"): if match := pattern.match(line): key, val = match.groups() if val.startswith('"') and val.endswith('"'): val = val.strip('"') release[key] = val try: - data['release_id'] = release['ID'] - data['release_version'] = release['VERSION_ID'] - data['release_full'] = release['PRETTY_NAME'] + data["release_id"] = release["ID"] + data["release_version"] = release["VERSION_ID"] + data["release_full"] = release["PRETTY_NAME"] except KeyError: errors.append(f"Failed to parse {release_path}") # timezone - timezone_path = kwargs.get('timezone_path', '/etc/timezone') + timezone_path = kwargs.get("timezone_path", "/etc/timezone") try: - with open(timezone_path, 'rt') as f: + with open(timezone_path, "rt") as f: output = f.read() except: errors.append(f"Failed to read {timezone_path}") else: - data['timezone'] = output.strip() + data["timezone"] = output.strip() if errors: - data['errors'] = errors + data["errors"] = errors return data def collect_data_python(self, profile): @@ -171,7 +171,7 @@ class TelemetryHandler(GenericHandler): "release_full": "Python 3.11.2", "release_version": "3.11.2", } - + If a virtual environment is involved the result will include its root path:: @@ -181,7 +181,7 @@ class TelemetryHandler(GenericHandler): "release_full": "Python 3.11.2", "release_version": "3.11.2", } - + :param profile: :class:`TelemetryProfile` instance. :returns: Data dict similar to the above. May have an @@ -191,31 +191,32 @@ class TelemetryHandler(GenericHandler): errors = [] # envroot determines python executable - envroot = profile.get_str('collect.python.envroot') + envroot = profile.get_str("collect.python.envroot") if envroot: - data['envroot'] = envroot - python = os.path.join(envroot, 'bin/python') + data["envroot"] = envroot + python = os.path.join(envroot, "bin/python") else: - python = profile.get_str('collect.python.executable', - default='/usr/bin/python3') + python = profile.get_str( + "collect.python.executable", default="/usr/bin/python3" + ) # python version - data['executable'] = python + data["executable"] = python try: - output = subprocess.check_output([python, '--version']) + output = subprocess.check_output([python, "--version"]) except (subprocess.CalledProcessError, FileNotFoundError) as err: errors.append("Failed to execute `python --version`") errors.append(str(err)) else: - output = output.decode('utf_8').strip() - data['release_full'] = output - if match := re.match(r'^Python (\d+\.\d+\.\d+)', output): - data['release_version'] = match.group(1) + output = output.decode("utf_8").strip() + data["release_full"] = output + if match := re.match(r"^Python (\d+\.\d+\.\d+)", output): + data["release_version"] = match.group(1) else: errors.append("Failed to parse Python version") if errors: - data['errors'] = errors + data["errors"] = errors return data def submit_all_data(self, profile=None, data=None): @@ -269,6 +270,6 @@ class TelemetryProfile(WuttaConfigProfile): def load(self): """ """ - keys = self.get_str('collect.keys', default='os,python') + keys = self.get_str("collect.keys", default="os,python") self.collect_keys = self.config.parse_list(keys) - self.submit_url = self.get_str('submit.url') + self.submit_url = self.get_str("submit.url") diff --git a/tasks.py b/tasks.py index 58e8ed2..489a5ff 100644 --- a/tasks.py +++ b/tasks.py @@ -15,10 +15,10 @@ def release(c, skip_tests=False): Release a new version of WuttaTell """ if not skip_tests: - c.run('pytest') + c.run("pytest") - if os.path.exists('dist'): - shutil.rmtree('dist') + if os.path.exists("dist"): + shutil.rmtree("dist") - c.run('python -m build --sdist') - c.run('twine upload dist/*') + c.run("python -m build --sdist") + c.run("twine upload dist/*") diff --git a/tests/cli/test_tell.py b/tests/cli/test_tell.py index 0bdafe3..ec7db43 100644 --- a/tests/cli/test_tell.py +++ b/tests/cli/test_tell.py @@ -14,10 +14,10 @@ class TestTell(ConfigTestCase): ctx = Mock() ctx.parent = Mock() ctx.parent.wutta_config = self.config - with patch.object(TelemetryHandler, 'submit_all_data') as submit_all_data: + with patch.object(TelemetryHandler, "submit_all_data") as submit_all_data: # dry run - with patch.object(TelemetryHandler, 'collect_all_data') as collect_all_data: + with patch.object(TelemetryHandler, "collect_all_data") as collect_all_data: mod.tell(ctx, dry_run=True) collect_all_data.assert_called_once_with(profile=None) submit_all_data.assert_not_called() diff --git a/tests/test_client.py b/tests/test_client.py index 95691ee..10e13a5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -21,33 +21,42 @@ class TestSimpleAPIClient(ConfigTestCase): def test_constructor(self): # caller specifies params - client = self.make_client(base_url='https://example.com/api/', - token='XYZPDQ12345', - ssl_verify=False, - max_retries=5) - self.assertEqual(client.base_url, 'https://example.com/api') # no trailing slash - self.assertEqual(client.token, 'XYZPDQ12345') + client = self.make_client( + base_url="https://example.com/api/", + token="XYZPDQ12345", + ssl_verify=False, + max_retries=5, + ) + self.assertEqual( + client.base_url, "https://example.com/api" + ) # no trailing slash + self.assertEqual(client.token, "XYZPDQ12345") self.assertFalse(client.ssl_verify) self.assertEqual(client.max_retries, 5) self.assertIsNone(client.session) # now with some defaults - client = self.make_client(base_url='https://example.com/api/', - token='XYZPDQ12345') - self.assertEqual(client.base_url, 'https://example.com/api') # no trailing slash - self.assertEqual(client.token, 'XYZPDQ12345') + client = self.make_client( + base_url="https://example.com/api/", token="XYZPDQ12345" + ) + self.assertEqual( + client.base_url, "https://example.com/api" + ) # no trailing slash + self.assertEqual(client.token, "XYZPDQ12345") self.assertTrue(client.ssl_verify) self.assertIsNone(client.max_retries) self.assertIsNone(client.session) # now from config - self.config.setdefault('wutta.api.base_url', 'https://another.com/api/') - self.config.setdefault('wutta.api.token', '9843243q4') - self.config.setdefault('wutta.api.ssl_verify', 'false') - self.config.setdefault('wutta.api.max_retries', '4') + self.config.setdefault("wutta.api.base_url", "https://another.com/api/") + self.config.setdefault("wutta.api.token", "9843243q4") + self.config.setdefault("wutta.api.ssl_verify", "false") + self.config.setdefault("wutta.api.max_retries", "4") client = self.make_client() - self.assertEqual(client.base_url, 'https://another.com/api') # no trailing slash - self.assertEqual(client.token, '9843243q4') + self.assertEqual( + client.base_url, "https://another.com/api" + ) # no trailing slash + self.assertEqual(client.token, "9843243q4") self.assertFalse(client.ssl_verify) self.assertEqual(client.max_retries, 4) self.assertIsNone(client.session) @@ -55,16 +64,18 @@ class TestSimpleAPIClient(ConfigTestCase): def test_init_session(self): # client begins with no session - client = self.make_client(base_url='https://example.com/api', token='1234') + client = self.make_client(base_url="https://example.com/api", token="1234") self.assertIsNone(client.session) # session is created here client.init_session() self.assertIsInstance(client.session, requests.Session) self.assertTrue(client.session.verify) - self.assertTrue(all([a.max_retries.total == 0 for a in client.session.adapters.values()])) - self.assertIn('Authorization', client.session.headers) - self.assertEqual(client.session.headers['Authorization'], 'Bearer 1234') + self.assertTrue( + all([a.max_retries.total == 0 for a in client.session.adapters.values()]) + ) + self.assertIn("Authorization", client.session.headers) + self.assertEqual(client.session.headers["Authorization"], "Bearer 1234") # session is never re-created orig_session = client.session @@ -72,94 +83,124 @@ class TestSimpleAPIClient(ConfigTestCase): self.assertIs(client.session, orig_session) # new client/session with no ssl_verify - client = self.make_client(base_url='https://example.com/api', token='1234', ssl_verify=False) + client = self.make_client( + base_url="https://example.com/api", token="1234", ssl_verify=False + ) client.init_session() self.assertFalse(client.session.verify) # new client/session with max_retries - client = self.make_client(base_url='https://example.com/api', token='1234', max_retries=5) + client = self.make_client( + base_url="https://example.com/api", token="1234", max_retries=5 + ) client.init_session() - self.assertEqual(client.session.adapters['https://example.com/api'].max_retries.total, 5) + self.assertEqual( + client.session.adapters["https://example.com/api"].max_retries.total, 5 + ) def test_make_request_get(self): # start server threading.Thread(target=start_server).start() - while not SERVER['running']: + while not SERVER["running"]: time.sleep(0.02) # server returns our headers - client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False) - response = client.make_request('GET', '/telemetry') + client = self.make_client( + base_url=f'http://127.0.0.1:{SERVER["port"]}', + token="1234", + ssl_verify=False, + ) + response = client.make_request("GET", "/telemetry") result = response.json() - self.assertIn('headers', result) - self.assertIn('Authorization', result['headers']) - self.assertEqual(result['headers']['Authorization'], 'Bearer 1234') - self.assertNotIn('payload', result) + self.assertIn("headers", result) + self.assertIn("Authorization", result["headers"]) + self.assertEqual(result["headers"]["Authorization"], "Bearer 1234") + self.assertNotIn("payload", result) def test_make_request_post(self): # start server threading.Thread(target=start_server).start() - while not SERVER['running']: + while not SERVER["running"]: time.sleep(0.02) # server returns our headers + payload - client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False) - response = client.make_request('POST', '/telemetry', data={'os': {'name': 'debian'}}) + client = self.make_client( + base_url=f'http://127.0.0.1:{SERVER["port"]}', + token="1234", + ssl_verify=False, + ) + response = client.make_request( + "POST", "/telemetry", data={"os": {"name": "debian"}} + ) result = response.json() - self.assertIn('headers', result) - self.assertIn('Authorization', result['headers']) - self.assertEqual(result['headers']['Authorization'], 'Bearer 1234') - self.assertIn('payload', result) - self.assertEqual(json.loads(result['payload']), {'os': {'name': 'debian'}}) + self.assertIn("headers", result) + self.assertIn("Authorization", result["headers"]) + self.assertEqual(result["headers"]["Authorization"], "Bearer 1234") + self.assertIn("payload", result) + self.assertEqual(json.loads(result["payload"]), {"os": {"name": "debian"}}) def test_make_request_unsupported(self): # start server threading.Thread(target=start_server).start() - while not SERVER['running']: + while not SERVER["running"]: time.sleep(0.02) # e.g. DELETE is not implemented - client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False) - self.assertRaises(NotImplementedError, client.make_request, 'DELETE', '/telemetry') + client = self.make_client( + base_url=f'http://127.0.0.1:{SERVER["port"]}', + token="1234", + ssl_verify=False, + ) + self.assertRaises( + NotImplementedError, client.make_request, "DELETE", "/telemetry" + ) # nb. issue valid request to stop the server - client.make_request('GET', '/telemetry') + client.make_request("GET", "/telemetry") def test_get(self): # start server threading.Thread(target=start_server).start() - while not SERVER['running']: + while not SERVER["running"]: time.sleep(0.02) # server returns our headers - client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False) - response = client.get('/telemetry') + client = self.make_client( + base_url=f'http://127.0.0.1:{SERVER["port"]}', + token="1234", + ssl_verify=False, + ) + response = client.get("/telemetry") result = response.json() - self.assertIn('headers', result) - self.assertIn('Authorization', result['headers']) - self.assertEqual(result['headers']['Authorization'], 'Bearer 1234') - self.assertNotIn('payload', result) + self.assertIn("headers", result) + self.assertIn("Authorization", result["headers"]) + self.assertEqual(result["headers"]["Authorization"], "Bearer 1234") + self.assertNotIn("payload", result) def test_post(self): # start server threading.Thread(target=start_server).start() - while not SERVER['running']: + while not SERVER["running"]: time.sleep(0.02) # server returns our headers + payload - client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False) - response = client.post('/telemetry', data={'os': {'name': 'debian'}}) + client = self.make_client( + base_url=f'http://127.0.0.1:{SERVER["port"]}', + token="1234", + ssl_verify=False, + ) + response = client.post("/telemetry", data={"os": {"name": "debian"}}) result = response.json() - self.assertIn('headers', result) - self.assertIn('Authorization', result['headers']) - self.assertEqual(result['headers']['Authorization'], 'Bearer 1234') - self.assertIn('payload', result) - self.assertEqual(json.loads(result['payload']), {'os': {'name': 'debian'}}) + self.assertIn("headers", result) + self.assertIn("Authorization", result["headers"]) + self.assertEqual(result["headers"]["Authorization"], "Bearer 1234") + self.assertIn("payload", result) + self.assertEqual(json.loads(result["payload"]), {"os": {"name": "debian"}}) class FakeRequestHandler(BaseHTTPRequestHandler): @@ -167,37 +208,38 @@ class FakeRequestHandler(BaseHTTPRequestHandler): def do_GET(self): headers = dict([(k, v) for k, v in self.headers.items()]) - result = {'headers': headers} - result = json.dumps(result).encode('utf_8') + result = {"headers": headers} + result = json.dumps(result).encode("utf_8") self.send_response(HTTPStatus.OK) - self.send_header("Content-Type", 'text/json') + self.send_header("Content-Type", "text/json") self.send_header("Content-Length", str(len(result))) self.end_headers() self.wfile.write(result) def do_POST(self): headers = dict([(k, v) for k, v in self.headers.items()]) - length = int(self.headers.get('Content-Length')) - payload = self.rfile.read(length).decode('utf_8') - result = {'headers': headers, 'payload': payload} - result = json.dumps(result).encode('utf_8') + length = int(self.headers.get("Content-Length")) + payload = self.rfile.read(length).decode("utf_8") + result = {"headers": headers, "payload": payload} + result = json.dumps(result).encode("utf_8") self.send_response(HTTPStatus.OK) - self.send_header("Content-Type", 'text/json') + self.send_header("Content-Type", "text/json") self.send_header("Content-Length", str(len(result))) self.end_headers() self.wfile.write(result) -SERVER = {'running': False, 'port': 7314} +SERVER = {"running": False, "port": 7314} + def start_server(): - if SERVER['running']: + if SERVER["running"]: raise RuntimeError("http server is already running") - with HTTPServer(('127.0.0.1', SERVER['port']), FakeRequestHandler) as httpd: - SERVER['running'] = True + with HTTPServer(("127.0.0.1", SERVER["port"]), FakeRequestHandler) as httpd: + SERVER["running"] = True httpd.handle_request() - SERVER['running'] = False + SERVER["running"] = False diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index fb40b23..e5567bc 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -19,193 +19,208 @@ class TestTelemetryHandler(ConfigTestCase): def test_get_profile(self): # default - default = self.handler.get_profile('default') + default = self.handler.get_profile("default") self.assertIsInstance(default, mod.TelemetryProfile) - self.assertEqual(default.key, 'default') + self.assertEqual(default.key, "default") # same profile is returned profile = self.handler.get_profile(default) self.assertIs(profile, default) def test_collect_data_os(self): - profile = self.handler.get_profile('default') + profile = self.handler.get_profile("default") # typical / working scenario data = self.handler.collect_data_os(profile) self.assertIsInstance(data, dict) - self.assertIn('release_id', data) - self.assertIn('release_version', data) - self.assertIn('release_full', data) - self.assertIn('timezone', data) - self.assertNotIn('errors', data) + self.assertIn("release_id", data) + self.assertIn("release_version", data) + self.assertIn("release_full", data) + self.assertIn("timezone", data) + self.assertNotIn("errors", data) # unreadable release path - data = self.handler.collect_data_os(profile, release_path='/a/path/which/does/not/exist') + data = self.handler.collect_data_os( + profile, release_path="/a/path/which/does/not/exist" + ) self.assertIsInstance(data, dict) - self.assertNotIn('release_id', data) - self.assertNotIn('release_version', data) - self.assertNotIn('release_full', data) - self.assertIn('timezone', data) - self.assertIn('errors', data) - self.assertEqual(data['errors'], [ - "Failed to read /a/path/which/does/not/exist" - ]) + self.assertNotIn("release_id", data) + self.assertNotIn("release_version", data) + self.assertNotIn("release_full", data) + self.assertIn("timezone", data) + self.assertIn("errors", data) + self.assertEqual( + data["errors"], ["Failed to read /a/path/which/does/not/exist"] + ) # unparsable release path - path = self.write_file('release', "bad-content") + path = self.write_file("release", "bad-content") data = self.handler.collect_data_os(profile, release_path=path) self.assertIsInstance(data, dict) - self.assertNotIn('release_id', data) - self.assertNotIn('release_version', data) - self.assertNotIn('release_full', data) - self.assertIn('timezone', data) - self.assertIn('errors', data) - self.assertEqual(data['errors'], [ - f"Failed to parse {path}" - ]) + self.assertNotIn("release_id", data) + self.assertNotIn("release_version", data) + self.assertNotIn("release_full", data) + self.assertIn("timezone", data) + self.assertIn("errors", data) + self.assertEqual(data["errors"], [f"Failed to parse {path}"]) # unreadable timezone path - data = self.handler.collect_data_os(profile, timezone_path='/a/path/which/does/not/exist') + data = self.handler.collect_data_os( + profile, timezone_path="/a/path/which/does/not/exist" + ) self.assertIsInstance(data, dict) - self.assertIn('release_id', data) - self.assertIn('release_version', data) - self.assertIn('release_full', data) - self.assertNotIn('timezone', data) - self.assertIn('errors', data) - self.assertEqual(data['errors'], [ - "Failed to read /a/path/which/does/not/exist" - ]) + self.assertIn("release_id", data) + self.assertIn("release_version", data) + self.assertIn("release_full", data) + self.assertNotIn("timezone", data) + self.assertIn("errors", data) + self.assertEqual( + data["errors"], ["Failed to read /a/path/which/does/not/exist"] + ) def test_collect_data_python(self): - profile = self.handler.get_profile('default') + profile = self.handler.get_profile("default") # typical / working (system-wide) scenario data = self.handler.collect_data_python(profile) self.assertIsInstance(data, dict) - self.assertNotIn('envroot', data) - self.assertIn('executable', data) - self.assertIn('release_full', data) - self.assertIn('release_version', data) - self.assertNotIn('errors', data) + self.assertNotIn("envroot", data) + self.assertIn("executable", data) + self.assertIn("release_full", data) + self.assertIn("release_version", data) + self.assertNotIn("errors", data) # missing executable - with patch.dict(self.config.defaults, {'wutta.telemetry.default.collect.python.executable': '/bad/path'}): + with patch.dict( + self.config.defaults, + {"wutta.telemetry.default.collect.python.executable": "/bad/path"}, + ): data = self.handler.collect_data_python(profile) self.assertIsInstance(data, dict) - self.assertNotIn('envroot', data) - self.assertIn('executable', data) - self.assertNotIn('release_full', data) - self.assertNotIn('release_version', data) - self.assertIn('errors', data) - self.assertEqual(data['errors'][0], "Failed to execute `python --version`") + self.assertNotIn("envroot", data) + self.assertIn("executable", data) + self.assertNotIn("release_full", data) + self.assertNotIn("release_version", data) + self.assertIn("errors", data) + self.assertEqual(data["errors"][0], "Failed to execute `python --version`") # unparsable executable output - with patch.object(mod, 'subprocess') as subprocess: - subprocess.check_output.return_value = 'bad output'.encode('utf_8') + with patch.object(mod, "subprocess") as subprocess: + subprocess.check_output.return_value = "bad output".encode("utf_8") data = self.handler.collect_data_python(profile) self.assertIsInstance(data, dict) - self.assertNotIn('envroot', data) - self.assertIn('executable', data) - self.assertIn('release_full', data) - self.assertNotIn('release_version', data) - self.assertIn('errors', data) - self.assertEqual(data['errors'], [ - "Failed to parse Python version", - ]) + self.assertNotIn("envroot", data) + self.assertIn("executable", data) + self.assertIn("release_full", data) + self.assertNotIn("release_version", data) + self.assertIn("errors", data) + self.assertEqual( + data["errors"], + [ + "Failed to parse Python version", + ], + ) # typical / working (virtual environment) scenario - self.config.setdefault('wutta.telemetry.default.collect.python.envroot', '/srv/envs/poser') + self.config.setdefault( + "wutta.telemetry.default.collect.python.envroot", "/srv/envs/poser" + ) data = self.handler.collect_data_python(profile) self.assertIsInstance(data, dict) - self.assertIn('executable', data) - self.assertEqual(data['executable'], '/srv/envs/poser/bin/python') - self.assertNotIn('release_full', data) - self.assertNotIn('release_version', data) - self.assertIn('errors', data) - self.assertEqual(data['errors'][0], "Failed to execute `python --version`") + self.assertIn("executable", data) + self.assertEqual(data["executable"], "/srv/envs/poser/bin/python") + self.assertNotIn("release_full", data) + self.assertNotIn("release_version", data) + self.assertIn("errors", data) + self.assertEqual(data["errors"][0], "Failed to execute `python --version`") def test_normalize_errors(self): data = { - 'os': { - 'timezone': 'America/Chicago', - 'errors': [ + "os": { + "timezone": "America/Chicago", + "errors": [ "Failed to read /etc/os-release", ], }, - 'python': { - 'executable': '/usr/bin/python3', - 'errors': [ + "python": { + "executable": "/usr/bin/python3", + "errors": [ "Failed to run `python --version`", ], }, } self.handler.normalize_errors(data) - self.assertIn('os', data) - self.assertIn('python', data) - self.assertIn('errors', data) - self.assertEqual(data['errors'], [ - "Failed to read /etc/os-release", - "Failed to run `python --version`", - ]) + self.assertIn("os", data) + self.assertIn("python", data) + self.assertIn("errors", data) + self.assertEqual( + data["errors"], + [ + "Failed to read /etc/os-release", + "Failed to run `python --version`", + ], + ) def test_collect_all_data(self): # typical / working scenario data = self.handler.collect_all_data() self.assertIsInstance(data, dict) - self.assertIn('os', data) - self.assertIn('python', data) - self.assertNotIn('errors', data) + self.assertIn("os", data) + self.assertIn("python", data) + self.assertNotIn("errors", data) def test_submit_all_data(self): - profile = self.handler.get_profile('default') - profile.submit_url = '/testing' + profile = self.handler.get_profile("default") + profile.submit_url = "/testing" - with patch.object(mod, 'SimpleAPIClient') as SimpleAPIClient: + with patch.object(mod, "SimpleAPIClient") as SimpleAPIClient: client = MagicMock() SimpleAPIClient.return_value = client # collecting all data - with patch.object(self.handler, 'collect_all_data') as collect_all_data: + with patch.object(self.handler, "collect_all_data") as collect_all_data: collect_all_data.return_value = [] self.handler.submit_all_data(profile) collect_all_data.assert_called_once_with(profile) - client.post.assert_called_once_with('/testing', data=[]) + client.post.assert_called_once_with("/testing", data=[]) # use data from caller client.post.reset_mock() - self.handler.submit_all_data(profile, data=['foo']) - client.post.assert_called_once_with('/testing', data=['foo']) + self.handler.submit_all_data(profile, data=["foo"]) + client.post.assert_called_once_with("/testing", data=["foo"]) class TestTelemetryProfile(ConfigTestCase): - def make_profile(self, key='default'): + def make_profile(self, key="default"): return mod.TelemetryProfile(self.config, key) def test_section(self): # default profile = self.make_profile() - self.assertEqual(profile.section, 'wutta.telemetry') + self.assertEqual(profile.section, "wutta.telemetry") # custom appname - with patch.object(self.config, 'appname', new='wuttatest'): + with patch.object(self.config, "appname", new="wuttatest"): profile = self.make_profile() - self.assertEqual(profile.section, 'wuttatest.telemetry') + self.assertEqual(profile.section, "wuttatest.telemetry") def test_load(self): # defaults profile = self.make_profile() - self.assertEqual(profile.collect_keys, ['os', 'python']) + self.assertEqual(profile.collect_keys, ["os", "python"]) self.assertIsNone(profile.submit_url) # configured - self.config.setdefault('wutta.telemetry.default.collect.keys', 'os,network,python') - self.config.setdefault('wutta.telemetry.default.submit.url', '/nodes/telemetry') + self.config.setdefault( + "wutta.telemetry.default.collect.keys", "os,network,python" + ) + self.config.setdefault("wutta.telemetry.default.submit.url", "/nodes/telemetry") profile = self.make_profile() - self.assertEqual(profile.collect_keys, ['os', 'network', 'python']) - self.assertEqual(profile.submit_url, '/nodes/telemetry') + self.assertEqual(profile.collect_keys, ["os", "network", "python"]) + self.assertEqual(profile.submit_url, "/nodes/telemetry")