-from mgr_module import MgrModule, CommandResult, Option
+from mgr_module import MgrModule, CommandResult, CLICommand, Option
import errno
import json
import random
import sys
import threading
+from typing import List, Optional, Tuple
class Module(MgrModule):
max=42),
]
- COMMANDS = [
- {
- "cmd": "mgr self-test run",
- "desc": "Run mgr python interface tests",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test background start name=workload,type=CephString",
- "desc": "Activate a background workload (one of {0})".format(
- ", ".join(WORKLOADS)),
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test background stop",
- "desc": "Stop background workload if any is running",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test config get name=key,type=CephString",
- "desc": "Peek at a configuration value",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test config get_localized name=key,type=CephString",
- "desc": "Peek at a configuration value (localized variant)",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test remote",
- "desc": "Test inter-module calls",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test module name=module,type=CephString",
- "desc": "Run another module's self_test() method",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test health set name=checks,type=CephString",
- "desc": "Set a health check from a JSON-formatted description.",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test health clear name=checks,type=CephString,n=N,req=False",
- "desc": "Clear health checks by name. If no names provided, clear all.",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test insights_set_now_offset name=hours,type=CephString",
- "desc": "Set the now time for the insights module.",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test cluster-log name=channel,type=CephString "
- "name=priority,type=CephString "
- "name=message,type=CephString",
- "desc": "Create an audit log record.",
- "perm": "rw"
- },
- {
- "cmd": "mgr self-test python-version",
- "desc": "Query the version of the embedded Python runtime",
- "perm": "r"
- },
- ]
-
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self._event = threading.Event()
self._workload = None
self._health = {}
- def handle_command(self, inbuf, command):
- if command['prefix'] == 'mgr self-test python-version':
- major = sys.version_info.major
- minor = sys.version_info.minor
- micro = sys.version_info.micro
- return 0, f'{major}.{minor}.{micro}', ''
-
- elif command['prefix'] == 'mgr self-test run':
- self._self_test()
- return 0, '', 'Self-test succeeded'
-
- elif command['prefix'] == 'mgr self-test background start':
- if command['workload'] not in self.WORKLOADS:
- return (-errno.EINVAL, '',
- "Workload not found '{0}'".format(command['workload']))
- self._workload = command['workload']
+ @CLICommand('mgr self-test python-version', perm='r')
+ def python_version(self) -> Tuple[int, str, str]:
+ '''
+ Query the version of the embedded Python runtime
+ '''
+ major = sys.version_info.major
+ minor = sys.version_info.minor
+ micro = sys.version_info.micro
+ return 0, f'{major}.{minor}.{micro}', ''
+
+ @CLICommand('mgr self-test run')
+ def run(self) -> Tuple[int, str, str]:
+ '''
+ Run mgr python interface tests
+ '''
+ self._self_test()
+ return 0, '', 'Self-test succeeded'
+
+ @CLICommand('mgr self-test background start')
+ def backgroun_start(self, workload: str) -> Tuple[int, str, str]:
+ '''
+ Activate a background workload (one of command_spam, throw_exception)
+ '''
+ self._workload = workload
+ self._event.set()
+ return 0, '', 'Running `{0}` in background'.format(self._workload)
+
+ @CLICommand('mgr self-test background stop')
+ def background_stop(self) -> Tuple[int, str, str]:
+ '''
+ Stop background workload if any is running
+ '''
+ if self._workload:
+ was_running = self._workload
+ self._workload = None
self._event.set()
- return 0, '', 'Running `{0}` in background'.format(self._workload)
-
- elif command['prefix'] == 'mgr self-test background stop':
- if self._workload:
- was_running = self._workload
- self._workload = None
- self._event.set()
- return 0, '', 'Stopping background workload `{0}`'.format(
- was_running)
- else:
- return 0, '', 'No background workload was running'
- elif command['prefix'] == 'mgr self-test config get':
- return 0, str(self.get_module_option(command['key'])), ''
- elif command['prefix'] == 'mgr self-test config get_localized':
- return 0, str(self.get_localized_module_option(command['key'])), ''
- elif command['prefix'] == 'mgr self-test remote':
- self._test_remote_calls()
- return 0, '', 'Successfully called'
- elif command['prefix'] == 'mgr self-test module':
- try:
- r = self.remote(command['module'], "self_test")
- except RuntimeError as e:
- return -1, '', "Test failed: {0}".format(e)
- else:
- return 0, str(r), "Self-test OK"
- elif command['prefix'] == 'mgr self-test health set':
- return self._health_set(inbuf, command)
- elif command['prefix'] == 'mgr self-test health clear':
- return self._health_clear(inbuf, command)
- elif command['prefix'] == 'mgr self-test insights_set_now_offset':
- return self._insights_set_now_offset(inbuf, command)
- elif command['prefix'] == 'mgr self-test cluster-log':
- priority_map = {
- 'info': self.ClusterLogPrio.INFO,
- 'security': self.ClusterLogPrio.SEC,
- 'warning': self.ClusterLogPrio.WARN,
- 'error': self.ClusterLogPrio.ERROR
- }
- self.cluster_log(command['channel'],
- priority_map[command['priority']],
- command['message'])
- return 0, '', 'Successfully called'
+ return 0, '', 'Stopping background workload `{0}`'.format(
+ was_running)
else:
- return (-errno.EINVAL, '',
- "Command not found '{0}'".format(command['prefix']))
-
- def _health_set(self, inbuf, command):
+ return 0, '', 'No background workload was running'
+
+ @CLICommand('mgr self-test config get')
+ def config_get(self, key: str) -> Tuple[int, str, str]:
+ '''
+ Peek at a configuration value
+ '''
+ return 0, str(self.get_module_option(key)), ''
+
+ @CLICommand('mgr self-test config get_localized')
+ def config_get_localized(self, key: str) -> Tuple[int, str, str]:
+ '''
+ Peek at a configuration value (localized variant)
+ '''
+ return 0, str(self.get_localized_module_option(key)), ''
+
+ @CLICommand('mgr self-test remote')
+ def test_remote(self) -> Tuple[int, str, str]:
+ '''
+ Test inter-module calls
+ '''
+ self._test_remote_calls()
+ return 0, '', 'Successfully called'
+
+ @CLICommand('mgr self-test module')
+ def module(self, module: str) -> Tuple[int, str, str]:
+ '''
+ Run another module's self_test() method
+ '''
try:
- checks = json.loads(command["checks"])
+ r = self.remote(module, "self_test")
+ except RuntimeError as e:
+ return -1, '', "Test failed: {0}".format(e)
+ else:
+ return 0, str(r), "Self-test OK"
+
+ @CLICommand('mgr self-test cluster-log')
+ def do_cluster_log(self,
+ channel: str,
+ priority: str,
+ message: str) -> Tuple[int, str, str]:
+ '''
+ Create an audit log record.
+ '''
+ priority_map = {
+ 'info': self.ClusterLogPrio.INFO,
+ 'security': self.ClusterLogPrio.SEC,
+ 'warning': self.ClusterLogPrio.WARN,
+ 'error': self.ClusterLogPrio.ERROR
+ }
+ self.cluster_log(channel,
+ priority_map[priority],
+ message)
+ return 0, '', 'Successfully called'
+
+ @CLICommand('mgr self-test health set')
+ def health_set(self, checks: str) -> Tuple[int, str, str]:
+ '''
+ Set a health check from a JSON-formatted description.
+ '''
+ try:
+ health_check = json.loads(checks)
except Exception as e:
return -1, "", "Failed to decode JSON input: {}".format(e)
try:
- for check, info in checks.items():
+ for check, info in health_check.items():
self._health[check] = {
"severity": str(info["severity"]),
"summary": str(info["summary"]),
self.set_health_checks(self._health)
return 0, "", ""
- def _health_clear(self, inbuf, command):
- if "checks" in command:
- for check in command["checks"]:
+ @CLICommand('mgr self-test health clear')
+ def health_clear(self, checks: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Clear health checks by name. If no names provided, clear all.
+ '''
+ if checks is not None:
+ for check in checks:
if check in self._health:
del self._health[check]
else:
self.set_health_checks(self._health)
return 0, "", ""
- def _insights_set_now_offset(self, inbuf, command):
- try:
- hours = int(command["hours"])
- except Exception as e:
- return -1, "", "Timestamp must be numeric: {}".format(e)
-
+ @CLICommand('mgr self-test insights_set_now_offset')
+ def insights_set_now_offset(self, hours: int) -> Tuple[int, str, str]:
+ '''
+ Set the now time for the insights module.
+ '''
self.remote("insights", "testing_set_now_time_offset", hours)
return 0, "", ""