fix: format all code with black

and from now on should not deviate from that...
This commit is contained in:
Lance Edgar 2025-08-31 12:47:46 -05:00
parent 18f7fa6c51
commit 8be1b66c9e
10 changed files with 334 additions and 258 deletions

View file

@ -14,10 +14,10 @@ class TestTell(ConfigTestCase):
ctx = Mock()
ctx.parent = Mock()
ctx.parent.wutta_config = self.config
with patch.object(TelemetryHandler, 'submit_all_data') as submit_all_data:
with patch.object(TelemetryHandler, "submit_all_data") as submit_all_data:
# dry run
with patch.object(TelemetryHandler, 'collect_all_data') as collect_all_data:
with patch.object(TelemetryHandler, "collect_all_data") as collect_all_data:
mod.tell(ctx, dry_run=True)
collect_all_data.assert_called_once_with(profile=None)
submit_all_data.assert_not_called()

View file

@ -21,33 +21,42 @@ class TestSimpleAPIClient(ConfigTestCase):
def test_constructor(self):
# caller specifies params
client = self.make_client(base_url='https://example.com/api/',
token='XYZPDQ12345',
ssl_verify=False,
max_retries=5)
self.assertEqual(client.base_url, 'https://example.com/api') # no trailing slash
self.assertEqual(client.token, 'XYZPDQ12345')
client = self.make_client(
base_url="https://example.com/api/",
token="XYZPDQ12345",
ssl_verify=False,
max_retries=5,
)
self.assertEqual(
client.base_url, "https://example.com/api"
) # no trailing slash
self.assertEqual(client.token, "XYZPDQ12345")
self.assertFalse(client.ssl_verify)
self.assertEqual(client.max_retries, 5)
self.assertIsNone(client.session)
# now with some defaults
client = self.make_client(base_url='https://example.com/api/',
token='XYZPDQ12345')
self.assertEqual(client.base_url, 'https://example.com/api') # no trailing slash
self.assertEqual(client.token, 'XYZPDQ12345')
client = self.make_client(
base_url="https://example.com/api/", token="XYZPDQ12345"
)
self.assertEqual(
client.base_url, "https://example.com/api"
) # no trailing slash
self.assertEqual(client.token, "XYZPDQ12345")
self.assertTrue(client.ssl_verify)
self.assertIsNone(client.max_retries)
self.assertIsNone(client.session)
# now from config
self.config.setdefault('wutta.api.base_url', 'https://another.com/api/')
self.config.setdefault('wutta.api.token', '9843243q4')
self.config.setdefault('wutta.api.ssl_verify', 'false')
self.config.setdefault('wutta.api.max_retries', '4')
self.config.setdefault("wutta.api.base_url", "https://another.com/api/")
self.config.setdefault("wutta.api.token", "9843243q4")
self.config.setdefault("wutta.api.ssl_verify", "false")
self.config.setdefault("wutta.api.max_retries", "4")
client = self.make_client()
self.assertEqual(client.base_url, 'https://another.com/api') # no trailing slash
self.assertEqual(client.token, '9843243q4')
self.assertEqual(
client.base_url, "https://another.com/api"
) # no trailing slash
self.assertEqual(client.token, "9843243q4")
self.assertFalse(client.ssl_verify)
self.assertEqual(client.max_retries, 4)
self.assertIsNone(client.session)
@ -55,16 +64,18 @@ class TestSimpleAPIClient(ConfigTestCase):
def test_init_session(self):
# client begins with no session
client = self.make_client(base_url='https://example.com/api', token='1234')
client = self.make_client(base_url="https://example.com/api", token="1234")
self.assertIsNone(client.session)
# session is created here
client.init_session()
self.assertIsInstance(client.session, requests.Session)
self.assertTrue(client.session.verify)
self.assertTrue(all([a.max_retries.total == 0 for a in client.session.adapters.values()]))
self.assertIn('Authorization', client.session.headers)
self.assertEqual(client.session.headers['Authorization'], 'Bearer 1234')
self.assertTrue(
all([a.max_retries.total == 0 for a in client.session.adapters.values()])
)
self.assertIn("Authorization", client.session.headers)
self.assertEqual(client.session.headers["Authorization"], "Bearer 1234")
# session is never re-created
orig_session = client.session
@ -72,94 +83,124 @@ class TestSimpleAPIClient(ConfigTestCase):
self.assertIs(client.session, orig_session)
# new client/session with no ssl_verify
client = self.make_client(base_url='https://example.com/api', token='1234', ssl_verify=False)
client = self.make_client(
base_url="https://example.com/api", token="1234", ssl_verify=False
)
client.init_session()
self.assertFalse(client.session.verify)
# new client/session with max_retries
client = self.make_client(base_url='https://example.com/api', token='1234', max_retries=5)
client = self.make_client(
base_url="https://example.com/api", token="1234", max_retries=5
)
client.init_session()
self.assertEqual(client.session.adapters['https://example.com/api'].max_retries.total, 5)
self.assertEqual(
client.session.adapters["https://example.com/api"].max_retries.total, 5
)
def test_make_request_get(self):
# start server
threading.Thread(target=start_server).start()
while not SERVER['running']:
while not SERVER["running"]:
time.sleep(0.02)
# server returns our headers
client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False)
response = client.make_request('GET', '/telemetry')
client = self.make_client(
base_url=f'http://127.0.0.1:{SERVER["port"]}',
token="1234",
ssl_verify=False,
)
response = client.make_request("GET", "/telemetry")
result = response.json()
self.assertIn('headers', result)
self.assertIn('Authorization', result['headers'])
self.assertEqual(result['headers']['Authorization'], 'Bearer 1234')
self.assertNotIn('payload', result)
self.assertIn("headers", result)
self.assertIn("Authorization", result["headers"])
self.assertEqual(result["headers"]["Authorization"], "Bearer 1234")
self.assertNotIn("payload", result)
def test_make_request_post(self):
# start server
threading.Thread(target=start_server).start()
while not SERVER['running']:
while not SERVER["running"]:
time.sleep(0.02)
# server returns our headers + payload
client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False)
response = client.make_request('POST', '/telemetry', data={'os': {'name': 'debian'}})
client = self.make_client(
base_url=f'http://127.0.0.1:{SERVER["port"]}',
token="1234",
ssl_verify=False,
)
response = client.make_request(
"POST", "/telemetry", data={"os": {"name": "debian"}}
)
result = response.json()
self.assertIn('headers', result)
self.assertIn('Authorization', result['headers'])
self.assertEqual(result['headers']['Authorization'], 'Bearer 1234')
self.assertIn('payload', result)
self.assertEqual(json.loads(result['payload']), {'os': {'name': 'debian'}})
self.assertIn("headers", result)
self.assertIn("Authorization", result["headers"])
self.assertEqual(result["headers"]["Authorization"], "Bearer 1234")
self.assertIn("payload", result)
self.assertEqual(json.loads(result["payload"]), {"os": {"name": "debian"}})
def test_make_request_unsupported(self):
# start server
threading.Thread(target=start_server).start()
while not SERVER['running']:
while not SERVER["running"]:
time.sleep(0.02)
# e.g. DELETE is not implemented
client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False)
self.assertRaises(NotImplementedError, client.make_request, 'DELETE', '/telemetry')
client = self.make_client(
base_url=f'http://127.0.0.1:{SERVER["port"]}',
token="1234",
ssl_verify=False,
)
self.assertRaises(
NotImplementedError, client.make_request, "DELETE", "/telemetry"
)
# nb. issue valid request to stop the server
client.make_request('GET', '/telemetry')
client.make_request("GET", "/telemetry")
def test_get(self):
# start server
threading.Thread(target=start_server).start()
while not SERVER['running']:
while not SERVER["running"]:
time.sleep(0.02)
# server returns our headers
client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False)
response = client.get('/telemetry')
client = self.make_client(
base_url=f'http://127.0.0.1:{SERVER["port"]}',
token="1234",
ssl_verify=False,
)
response = client.get("/telemetry")
result = response.json()
self.assertIn('headers', result)
self.assertIn('Authorization', result['headers'])
self.assertEqual(result['headers']['Authorization'], 'Bearer 1234')
self.assertNotIn('payload', result)
self.assertIn("headers", result)
self.assertIn("Authorization", result["headers"])
self.assertEqual(result["headers"]["Authorization"], "Bearer 1234")
self.assertNotIn("payload", result)
def test_post(self):
# start server
threading.Thread(target=start_server).start()
while not SERVER['running']:
while not SERVER["running"]:
time.sleep(0.02)
# server returns our headers + payload
client = self.make_client(base_url=f'http://127.0.0.1:{SERVER["port"]}', token='1234', ssl_verify=False)
response = client.post('/telemetry', data={'os': {'name': 'debian'}})
client = self.make_client(
base_url=f'http://127.0.0.1:{SERVER["port"]}',
token="1234",
ssl_verify=False,
)
response = client.post("/telemetry", data={"os": {"name": "debian"}})
result = response.json()
self.assertIn('headers', result)
self.assertIn('Authorization', result['headers'])
self.assertEqual(result['headers']['Authorization'], 'Bearer 1234')
self.assertIn('payload', result)
self.assertEqual(json.loads(result['payload']), {'os': {'name': 'debian'}})
self.assertIn("headers", result)
self.assertIn("Authorization", result["headers"])
self.assertEqual(result["headers"]["Authorization"], "Bearer 1234")
self.assertIn("payload", result)
self.assertEqual(json.loads(result["payload"]), {"os": {"name": "debian"}})
class FakeRequestHandler(BaseHTTPRequestHandler):
@ -167,37 +208,38 @@ class FakeRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
headers = dict([(k, v) for k, v in self.headers.items()])
result = {'headers': headers}
result = json.dumps(result).encode('utf_8')
result = {"headers": headers}
result = json.dumps(result).encode("utf_8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", 'text/json')
self.send_header("Content-Type", "text/json")
self.send_header("Content-Length", str(len(result)))
self.end_headers()
self.wfile.write(result)
def do_POST(self):
headers = dict([(k, v) for k, v in self.headers.items()])
length = int(self.headers.get('Content-Length'))
payload = self.rfile.read(length).decode('utf_8')
result = {'headers': headers, 'payload': payload}
result = json.dumps(result).encode('utf_8')
length = int(self.headers.get("Content-Length"))
payload = self.rfile.read(length).decode("utf_8")
result = {"headers": headers, "payload": payload}
result = json.dumps(result).encode("utf_8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", 'text/json')
self.send_header("Content-Type", "text/json")
self.send_header("Content-Length", str(len(result)))
self.end_headers()
self.wfile.write(result)
SERVER = {'running': False, 'port': 7314}
SERVER = {"running": False, "port": 7314}
def start_server():
if SERVER['running']:
if SERVER["running"]:
raise RuntimeError("http server is already running")
with HTTPServer(('127.0.0.1', SERVER['port']), FakeRequestHandler) as httpd:
SERVER['running'] = True
with HTTPServer(("127.0.0.1", SERVER["port"]), FakeRequestHandler) as httpd:
SERVER["running"] = True
httpd.handle_request()
SERVER['running'] = False
SERVER["running"] = False

View file

@ -19,193 +19,208 @@ class TestTelemetryHandler(ConfigTestCase):
def test_get_profile(self):
# default
default = self.handler.get_profile('default')
default = self.handler.get_profile("default")
self.assertIsInstance(default, mod.TelemetryProfile)
self.assertEqual(default.key, 'default')
self.assertEqual(default.key, "default")
# same profile is returned
profile = self.handler.get_profile(default)
self.assertIs(profile, default)
def test_collect_data_os(self):
profile = self.handler.get_profile('default')
profile = self.handler.get_profile("default")
# typical / working scenario
data = self.handler.collect_data_os(profile)
self.assertIsInstance(data, dict)
self.assertIn('release_id', data)
self.assertIn('release_version', data)
self.assertIn('release_full', data)
self.assertIn('timezone', data)
self.assertNotIn('errors', data)
self.assertIn("release_id", data)
self.assertIn("release_version", data)
self.assertIn("release_full", data)
self.assertIn("timezone", data)
self.assertNotIn("errors", data)
# unreadable release path
data = self.handler.collect_data_os(profile, release_path='/a/path/which/does/not/exist')
data = self.handler.collect_data_os(
profile, release_path="/a/path/which/does/not/exist"
)
self.assertIsInstance(data, dict)
self.assertNotIn('release_id', data)
self.assertNotIn('release_version', data)
self.assertNotIn('release_full', data)
self.assertIn('timezone', data)
self.assertIn('errors', data)
self.assertEqual(data['errors'], [
"Failed to read /a/path/which/does/not/exist"
])
self.assertNotIn("release_id", data)
self.assertNotIn("release_version", data)
self.assertNotIn("release_full", data)
self.assertIn("timezone", data)
self.assertIn("errors", data)
self.assertEqual(
data["errors"], ["Failed to read /a/path/which/does/not/exist"]
)
# unparsable release path
path = self.write_file('release', "bad-content")
path = self.write_file("release", "bad-content")
data = self.handler.collect_data_os(profile, release_path=path)
self.assertIsInstance(data, dict)
self.assertNotIn('release_id', data)
self.assertNotIn('release_version', data)
self.assertNotIn('release_full', data)
self.assertIn('timezone', data)
self.assertIn('errors', data)
self.assertEqual(data['errors'], [
f"Failed to parse {path}"
])
self.assertNotIn("release_id", data)
self.assertNotIn("release_version", data)
self.assertNotIn("release_full", data)
self.assertIn("timezone", data)
self.assertIn("errors", data)
self.assertEqual(data["errors"], [f"Failed to parse {path}"])
# unreadable timezone path
data = self.handler.collect_data_os(profile, timezone_path='/a/path/which/does/not/exist')
data = self.handler.collect_data_os(
profile, timezone_path="/a/path/which/does/not/exist"
)
self.assertIsInstance(data, dict)
self.assertIn('release_id', data)
self.assertIn('release_version', data)
self.assertIn('release_full', data)
self.assertNotIn('timezone', data)
self.assertIn('errors', data)
self.assertEqual(data['errors'], [
"Failed to read /a/path/which/does/not/exist"
])
self.assertIn("release_id", data)
self.assertIn("release_version", data)
self.assertIn("release_full", data)
self.assertNotIn("timezone", data)
self.assertIn("errors", data)
self.assertEqual(
data["errors"], ["Failed to read /a/path/which/does/not/exist"]
)
def test_collect_data_python(self):
profile = self.handler.get_profile('default')
profile = self.handler.get_profile("default")
# typical / working (system-wide) scenario
data = self.handler.collect_data_python(profile)
self.assertIsInstance(data, dict)
self.assertNotIn('envroot', data)
self.assertIn('executable', data)
self.assertIn('release_full', data)
self.assertIn('release_version', data)
self.assertNotIn('errors', data)
self.assertNotIn("envroot", data)
self.assertIn("executable", data)
self.assertIn("release_full", data)
self.assertIn("release_version", data)
self.assertNotIn("errors", data)
# missing executable
with patch.dict(self.config.defaults, {'wutta.telemetry.default.collect.python.executable': '/bad/path'}):
with patch.dict(
self.config.defaults,
{"wutta.telemetry.default.collect.python.executable": "/bad/path"},
):
data = self.handler.collect_data_python(profile)
self.assertIsInstance(data, dict)
self.assertNotIn('envroot', data)
self.assertIn('executable', data)
self.assertNotIn('release_full', data)
self.assertNotIn('release_version', data)
self.assertIn('errors', data)
self.assertEqual(data['errors'][0], "Failed to execute `python --version`")
self.assertNotIn("envroot", data)
self.assertIn("executable", data)
self.assertNotIn("release_full", data)
self.assertNotIn("release_version", data)
self.assertIn("errors", data)
self.assertEqual(data["errors"][0], "Failed to execute `python --version`")
# unparsable executable output
with patch.object(mod, 'subprocess') as subprocess:
subprocess.check_output.return_value = 'bad output'.encode('utf_8')
with patch.object(mod, "subprocess") as subprocess:
subprocess.check_output.return_value = "bad output".encode("utf_8")
data = self.handler.collect_data_python(profile)
self.assertIsInstance(data, dict)
self.assertNotIn('envroot', data)
self.assertIn('executable', data)
self.assertIn('release_full', data)
self.assertNotIn('release_version', data)
self.assertIn('errors', data)
self.assertEqual(data['errors'], [
"Failed to parse Python version",
])
self.assertNotIn("envroot", data)
self.assertIn("executable", data)
self.assertIn("release_full", data)
self.assertNotIn("release_version", data)
self.assertIn("errors", data)
self.assertEqual(
data["errors"],
[
"Failed to parse Python version",
],
)
# typical / working (virtual environment) scenario
self.config.setdefault('wutta.telemetry.default.collect.python.envroot', '/srv/envs/poser')
self.config.setdefault(
"wutta.telemetry.default.collect.python.envroot", "/srv/envs/poser"
)
data = self.handler.collect_data_python(profile)
self.assertIsInstance(data, dict)
self.assertIn('executable', data)
self.assertEqual(data['executable'], '/srv/envs/poser/bin/python')
self.assertNotIn('release_full', data)
self.assertNotIn('release_version', data)
self.assertIn('errors', data)
self.assertEqual(data['errors'][0], "Failed to execute `python --version`")
self.assertIn("executable", data)
self.assertEqual(data["executable"], "/srv/envs/poser/bin/python")
self.assertNotIn("release_full", data)
self.assertNotIn("release_version", data)
self.assertIn("errors", data)
self.assertEqual(data["errors"][0], "Failed to execute `python --version`")
def test_normalize_errors(self):
data = {
'os': {
'timezone': 'America/Chicago',
'errors': [
"os": {
"timezone": "America/Chicago",
"errors": [
"Failed to read /etc/os-release",
],
},
'python': {
'executable': '/usr/bin/python3',
'errors': [
"python": {
"executable": "/usr/bin/python3",
"errors": [
"Failed to run `python --version`",
],
},
}
self.handler.normalize_errors(data)
self.assertIn('os', data)
self.assertIn('python', data)
self.assertIn('errors', data)
self.assertEqual(data['errors'], [
"Failed to read /etc/os-release",
"Failed to run `python --version`",
])
self.assertIn("os", data)
self.assertIn("python", data)
self.assertIn("errors", data)
self.assertEqual(
data["errors"],
[
"Failed to read /etc/os-release",
"Failed to run `python --version`",
],
)
def test_collect_all_data(self):
# typical / working scenario
data = self.handler.collect_all_data()
self.assertIsInstance(data, dict)
self.assertIn('os', data)
self.assertIn('python', data)
self.assertNotIn('errors', data)
self.assertIn("os", data)
self.assertIn("python", data)
self.assertNotIn("errors", data)
def test_submit_all_data(self):
profile = self.handler.get_profile('default')
profile.submit_url = '/testing'
profile = self.handler.get_profile("default")
profile.submit_url = "/testing"
with patch.object(mod, 'SimpleAPIClient') as SimpleAPIClient:
with patch.object(mod, "SimpleAPIClient") as SimpleAPIClient:
client = MagicMock()
SimpleAPIClient.return_value = client
# collecting all data
with patch.object(self.handler, 'collect_all_data') as collect_all_data:
with patch.object(self.handler, "collect_all_data") as collect_all_data:
collect_all_data.return_value = []
self.handler.submit_all_data(profile)
collect_all_data.assert_called_once_with(profile)
client.post.assert_called_once_with('/testing', data=[])
client.post.assert_called_once_with("/testing", data=[])
# use data from caller
client.post.reset_mock()
self.handler.submit_all_data(profile, data=['foo'])
client.post.assert_called_once_with('/testing', data=['foo'])
self.handler.submit_all_data(profile, data=["foo"])
client.post.assert_called_once_with("/testing", data=["foo"])
class TestTelemetryProfile(ConfigTestCase):
def make_profile(self, key='default'):
def make_profile(self, key="default"):
return mod.TelemetryProfile(self.config, key)
def test_section(self):
# default
profile = self.make_profile()
self.assertEqual(profile.section, 'wutta.telemetry')
self.assertEqual(profile.section, "wutta.telemetry")
# custom appname
with patch.object(self.config, 'appname', new='wuttatest'):
with patch.object(self.config, "appname", new="wuttatest"):
profile = self.make_profile()
self.assertEqual(profile.section, 'wuttatest.telemetry')
self.assertEqual(profile.section, "wuttatest.telemetry")
def test_load(self):
# defaults
profile = self.make_profile()
self.assertEqual(profile.collect_keys, ['os', 'python'])
self.assertEqual(profile.collect_keys, ["os", "python"])
self.assertIsNone(profile.submit_url)
# configured
self.config.setdefault('wutta.telemetry.default.collect.keys', 'os,network,python')
self.config.setdefault('wutta.telemetry.default.submit.url', '/nodes/telemetry')
self.config.setdefault(
"wutta.telemetry.default.collect.keys", "os,network,python"
)
self.config.setdefault("wutta.telemetry.default.submit.url", "/nodes/telemetry")
profile = self.make_profile()
self.assertEqual(profile.collect_keys, ['os', 'network', 'python'])
self.assertEqual(profile.submit_url, '/nodes/telemetry')
self.assertEqual(profile.collect_keys, ["os", "network", "python"])
self.assertEqual(profile.submit_url, "/nodes/telemetry")