import random
import sys
import threading
-from typing import List, Optional, Tuple
+from typing import Any, Dict, List, Optional, Tuple
# These workloads are things that can be requested to run inside the
max=42),
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self._event = threading.Event()
- self._workload = None
- self._health = {}
+ self._workload: Optional[Workload] = None
+ self._health: Dict[str, Dict[str, Any]] = {}
@CLICommand('mgr self-test python-version', perm='r')
def python_version(self) -> Tuple[int, str, str]:
self.remote("insights", "testing_set_now_time_offset", hours)
return 0, "", ""
- def _self_test(self):
+ def _self_test(self) -> None:
self.log.info("Running self-test procedure...")
self._self_test_osdmap()
self._self_test_misc()
self._self_test_perf_counters()
- def _self_test_getters(self):
+ def _self_test_getters(self) -> None:
self.version
self.get_context()
self.get_mgr_id()
self.get_daemon_status("osd", "0")
- def _self_test_config(self):
+ def _self_test_config(self) -> None:
# This is not a strong test (can't tell if values really
# persisted), it's just for the python interface bit.
assert isinstance(value, bool)
assert value is True
- def _self_test_store(self):
+ def _self_test_store(self) -> None:
existing_keys = set(self.get_store_prefix("test").keys())
self.set_store("testkey", "testvalue")
assert self.get_store("testkey") == "testvalue"
assert (set(self.get_store_prefix("test").keys())
== {"testkey"} | existing_keys)
- def _self_test_perf_counters(self):
+ def _self_test_perf_counters(self) -> None:
self.get_perf_schema("osd", "0")
self.get_counter("osd", "0", "osd.op")
# get_counter
# get_all_perf_coutners
- def _self_test_misc(self):
+ def _self_test_misc(self) -> None:
self.set_uri("http://this.is.a.test.com")
self.set_health_checks({})
- def _self_test_osdmap(self):
+ def _self_test_osdmap(self) -> None:
osdmap = self.get_osdmap()
osdmap.get_epoch()
osdmap.get_crush_version()
self.log.info("Finished self-test procedure.")
- def _test_remote_calls(self):
+ def _test_remote_calls(self) -> None:
# Test making valid call
self.remote("influx", "self_test")
else:
raise RuntimeError("KeyError not raised")
- def remote_from_orchestrator_cli_self_test(self, what):
+ def remote_from_orchestrator_cli_self_test(self, what: str) -> Any:
import orchestrator
if what == 'OrchestratorError':
- c = orchestrator.OrchResult(result=None, exception=orchestrator.OrchestratorError('hello, world'))
- return c
+ return orchestrator.OrchResult(result=None, exception=orchestrator.OrchestratorError('hello, world'))
elif what == "ZeroDivisionError":
- c = orchestrator.OrchResult(result=None, exception=ZeroDivisionError('hello, world'))
- return c
+ return orchestrator.OrchResult(result=None, exception=ZeroDivisionError('hello, world'))
assert False, repr(what)
- def shutdown(self):
+ def shutdown(self) -> None:
self._workload = Workload.SHUTDOWN
self._event.set()
- def _command_spam(self):
+ def _command_spam(self) -> None:
self.log.info("Starting command_spam workload...")
while not self._event.is_set():
osdmap = self.get_osdmap()
self._event.clear()
self.log.info("Ended command_spam workload...")
- def serve(self):
+ def serve(self) -> None:
while True:
if self._workload == Workload.COMMAND_SPAM:
self._command_spam()