From 00ab8e8edc291c860673612b2470a8b929e4f485 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sat, 16 Jan 2021 23:14:07 +0800 Subject: [PATCH] pybind/mgr/mgr_module: annotate more methods Signed-off-by: Kefu Chai (cherry picked from commit 8da98e55b96b1e31e2b0a0e20134a7a2ed9ad9a3) --- src/pybind/mgr/cephadm/module.py | 1 + src/pybind/mgr/mgr_module.py | 360 +++++++++++++++++-------------- 2 files changed, 205 insertions(+), 156 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index fd8664e2231d4..f8e759dabc2b1 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -367,6 +367,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, path = self.get_ceph_option('cephadm_path') try: + assert isinstance(path, str) with open(path, 'r') as f: self._cephadm = f.read() except (IOError, TypeError) as e: diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 5d1ed2afb1157..50e750c647b58 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1,7 +1,7 @@ import ceph_module # noqa -from typing import Tuple, Any, Dict, Generic, Optional, Callable, List, \ - Union, TYPE_CHECKING +from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \ + Sequence, Union, TYPE_CHECKING if TYPE_CHECKING: import sys if sys.version_info >= (3, 8): @@ -83,7 +83,7 @@ class CommandResult(object): Use with MgrModule.send_command """ - def __init__(self, tag=None): + def __init__(self, tag: Optional[str] = None): self.ev = threading.Event() self.outs = "" self.outb = "" @@ -93,13 +93,13 @@ class CommandResult(object): # C++ land, to avoid passing addresses around in messages. self.tag = tag if tag else "" - def complete(self, r, outb, outs): + def complete(self, r: int, outb: str, outs: str) -> None: self.r = r self.outb = outb self.outs = outs self.ev.set() - def wait(self): + def wait(self) -> Tuple[int, str, str]: self.ev.wait() return self.r, self.outb, self.outs @@ -298,10 +298,16 @@ class CRUSHMap(ceph_module.BasePyCRUSH): return dict(result) +HandlerFuncType = Callable[..., Tuple[int, str, str]] + + class CLICommand(object): COMMANDS = {} # type: Dict[str, CLICommand] - def __init__(self, prefix, perm="rw", poll=False): + def __init__(self, + prefix: str, + perm: str = 'rw', + poll: bool = False): self.prefix = prefix self.perm = perm self.poll = poll @@ -312,7 +318,7 @@ class CLICommand(object): KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return' @staticmethod - def load_func_metadata(f): + def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]: desc = inspect.getdoc(f) or '' full_argspec = inspect.getfullargspec(f) arg_spec = full_argspec.annotations @@ -331,24 +337,24 @@ class CLICommand(object): has_default)) return desc, arg_spec, first_default, ' '.join(args) - def store_func_metadata(self, f): + def store_func_metadata(self, f: HandlerFuncType) -> None: self.desc, self.arg_spec, self.first_default, self.args = \ self.load_func_metadata(f) - def __call__(self, func): + def __call__(self, func: HandlerFuncType) -> HandlerFuncType: self.store_func_metadata(func) self.func = func self.COMMANDS[self.prefix] = self return self.func - def _get_arg_value(self, kwargs_switch, key, val): - def start_kwargs(): + def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]: + def start_kwargs() -> bool: if isinstance(val, str) and '=' in val: k, v = val.split('=', 1) if k in self.arg_spec: return True - else: - return False + return False + if not kwargs_switch: kwargs_switch = start_kwargs() @@ -358,7 +364,7 @@ class CLICommand(object): k, v = key, val return kwargs_switch, k.replace('-', '_'), v - def _collect_args_by_argspec(self, cmd_dict): + def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]: kwargs = {} kwargs_switch = False for index, (name, tp) in enumerate(self.arg_spec.items()): @@ -374,14 +380,14 @@ class CLICommand(object): kwargs[k] = CephArgtype.cast_to(tp, v) return kwargs - def call(self, mgr, cmd_dict, inbuf): + def call(self, mgr: Any, cmd_dict: Dict[str, Any], inbuf: Optional[str] = None) -> HandleCommandResult: kwargs = self._collect_args_by_argspec(cmd_dict) if inbuf: kwargs['inbuf'] = inbuf assert self.func return self.func(mgr, **kwargs) - def dump_cmd(self): + def dump_cmd(self) -> Dict[str, Union[str, bool]]: return { 'cmd': '{} {}'.format(self.prefix, self.args), 'desc': self.desc, @@ -390,21 +396,21 @@ class CLICommand(object): } @classmethod - def dump_cmd_list(cls): + def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]: return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()] -def CLIReadCommand(prefix, poll=False): +def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand: return CLICommand(prefix, "r", poll) -def CLIWriteCommand(prefix, poll=False): +def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand: return CLICommand(prefix, "w", poll) -def CLICheckNonemptyFileInput(func): +def CLICheckNonemptyFileInput(func: HandlerFuncType) -> HandlerFuncType: @functools.wraps(func) - def check(*args, **kwargs): + def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]: if 'inbuf' not in kwargs: return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE if not kwargs['inbuf'] or (isinstance(kwargs['inbuf'], str) @@ -415,7 +421,7 @@ def CLICheckNonemptyFileInput(func): return check -def _get_localized_key(prefix, key): +def _get_localized_key(prefix: str, key: str) -> str: return '{}/{}'.format(prefix, key) @@ -473,10 +479,10 @@ class Command(dict): def __init__( self, - prefix, - handler, - perm="rw", - poll=False, + prefix: str, + handler: HandlerFuncType, + perm: str = "rw", + poll: bool = False, ): super().__init__(perm=perm, poll=poll) @@ -484,15 +490,16 @@ class Command(dict): self.handler = handler @staticmethod - def returns_command_result(instance, f): + def returns_command_result(instance: Any, + f: HandlerFuncType) -> Callable[..., HandleCommandResult]: @functools.wraps(f) - def wrapper(mgr, *args, **kwargs): + def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult: retval, stdout, stderr = f(instance or mgr, *args, **kwargs) return HandleCommandResult(retval, stdout, stderr) wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined] return wrapper - def register(self, instance=False): + def register(self, instance: bool = False) -> HandlerFuncType: """ Register a CLICommand handler. It allows an instance to register bound methods. In that case, the mgr instance is not passed, and it's expected @@ -505,24 +512,24 @@ class Command(dict): class CPlusPlusHandler(logging.Handler): - def __init__(self, module_inst): + def __init__(self, module_inst: Any): super(CPlusPlusHandler, self).__init__() self._module = module_inst self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s" .format(module_inst.module_name))) - def emit(self, record): + def emit(self, record: logging.LogRecord) -> None: if record.levelno >= self.level: self._module._ceph_log(self.format(record)) class ClusterLogHandler(logging.Handler): - def __init__(self, module_inst): + def __init__(self, module_inst: Any): super().__init__() self._module = module_inst self.setFormatter(logging.Formatter("%(message)s")) - def emit(self, record): + def emit(self, record: logging.LogRecord) -> None: levelmap = { 'DEBUG': MgrModule.ClusterLogPrio.DEBUG, 'INFO': MgrModule.ClusterLogPrio.INFO, @@ -538,7 +545,7 @@ class ClusterLogHandler(logging.Handler): class FileHandler(logging.FileHandler): - def __init__(self, module_inst): + def __init__(self, module_inst: Any): path = module_inst.get_ceph_option("log_file") idx = path.rfind(".log") if idx != -1: @@ -550,10 +557,14 @@ class FileHandler(logging.FileHandler): class MgrModuleLoggingMixin(object): - def _configure_logging(self, mgr_level, module_level, cluster_level, - log_to_file, log_to_cluster): - self._mgr_level = None - self._module_level = None + def _configure_logging(self, + mgr_level: str, + module_level: str, + cluster_level: str, + log_to_file: bool, + log_to_cluster: bool) -> None: + self._mgr_level: Optional[str] = None + self._module_level: Optional[str] = None self._root_logger = logging.getLogger() self._unconfigure_logging() @@ -575,7 +586,7 @@ class MgrModuleLoggingMixin(object): self._root_logger.setLevel(logging.NOTSET) self._set_log_level(mgr_level, module_level, cluster_level) - def _unconfigure_logging(self): + def _unconfigure_logging(self) -> None: # remove existing handlers: rm_handlers = [ h for h in self._root_logger.handlers @@ -587,7 +598,10 @@ class MgrModuleLoggingMixin(object): self.log_to_file = False self.log_to_cluster = False - def _set_log_level(self, mgr_level, module_level, cluster_level): + def _set_log_level(self, + mgr_level: str, + module_level: str, + cluster_level: str) -> None: self._cluster_log_handler.setLevel(cluster_level.upper()) module_level = module_level.upper() if module_level else '' @@ -619,34 +633,34 @@ class MgrModuleLoggingMixin(object): self._mgr_log_handler.setLevel(level) self._file_log_handler.setLevel(level) - def _enable_file_log(self): + def _enable_file_log(self) -> None: # enable file log self.getLogger().warning("enabling logging to file") self.log_to_file = True self._root_logger.addHandler(self._file_log_handler) - def _disable_file_log(self): + def _disable_file_log(self) -> None: # disable file log self.getLogger().warning("disabling logging to file") self.log_to_file = False self._root_logger.removeHandler(self._file_log_handler) - def _enable_cluster_log(self): + def _enable_cluster_log(self) -> None: # enable cluster log self.getLogger().warning("enabling logging to cluster") self.log_to_cluster = True self._root_logger.addHandler(self._cluster_log_handler) - def _disable_cluster_log(self): + def _disable_cluster_log(self) -> None: # disable cluster log self.getLogger().warning("disabling logging to cluster") self.log_to_cluster = False self._root_logger.removeHandler(self._cluster_log_handler) - def _ceph_log_level_to_python(self, ceph_log_level): - if ceph_log_level: + def _ceph_log_level_to_python(self, log_level: str) -> str: + if log_level: try: - ceph_log_level = int(ceph_log_level.split("/", 1)[0]) + ceph_log_level = int(log_level.split("/", 1)[0]) except ValueError: ceph_log_level = 0 else: @@ -661,7 +675,7 @@ class MgrModuleLoggingMixin(object): log_level = "INFO" return log_level - def getLogger(self, name=None): + def getLogger(self, name: Optional[str] = None) -> logging.Logger: return logging.getLogger(name) @@ -678,7 +692,7 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin): MODULE_OPTIONS: List[Option] = [] MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any] - def __init__(self, module_name, capsule): + def __init__(self, module_name: str, capsule: Any): super(MgrStandbyModule, self).__init__(capsule) self.module_name = module_name @@ -690,24 +704,25 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin): else: self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default']) - mgr_level = self.get_ceph_option("debug_mgr") - log_level = self.get_module_option("log_level") - cluster_level = self.get_module_option('log_to_cluster_level') + # mock does not return a str + mgr_level = cast(str, self.get_ceph_option("debug_mgr")) + log_level = cast(str, self.get_module_option("log_level")) + cluster_level = cast(str, self.get_module_option('log_to_cluster_level')) self._configure_logging(mgr_level, log_level, cluster_level, False, False) # for backwards compatibility self._logger = self.getLogger() - def __del__(self): + def __del__(self) -> None: self._cleanup() self._unconfigure_logging() - def _cleanup(self): + def _cleanup(self) -> None: pass @classmethod - def _register_options(cls, module_name): + def _register_options(cls, module_name: str) -> None: cls.MODULE_OPTIONS.append( Option(name='log_level', type='str', default="", runtime=True, enum_allowed=['info', 'debug', 'critical', 'error', @@ -725,17 +740,17 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin): 'warning', ''])) @property - def log(self): + def log(self) -> logging.Logger: return self._logger - def serve(self): + def serve(self) -> None: """ The serve method is mandatory for standby modules. :return: """ raise NotImplementedError() - def get_mgr_id(self): + def get_mgr_id(self) -> str: return self._ceph_get_mgr_id() def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue: @@ -750,10 +765,10 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin): else: return r - def get_ceph_option(self, key): + def get_ceph_option(self, key: str) -> OptionValue: return self._ceph_get_option(key) - def get_store(self, key): + def get_store(self, key: str) -> Optional[str]: """ Retrieve the value of a persistent KV store entry @@ -762,7 +777,7 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin): """ return self._ceph_get_store(key) - def get_active_uri(self): + def get_active_uri(self) -> str: return self._ceph_get_active_uri() def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue: @@ -807,7 +822,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): WARN = 3 ERROR = 4 - def __init__(self, module_name, py_modules_ptr, this_ptr): + def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object): self.module_name = module_name super(MgrModule, self).__init__(py_modules_ptr, this_ptr) @@ -823,11 +838,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): # with default and user-supplied option values. self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default']) - mgr_level = self.get_ceph_option("debug_mgr") - log_level = self.get_module_option("log_level") - cluster_level = self.get_module_option('log_to_cluster_level') + mgr_level = cast(str, self.get_ceph_option("debug_mgr")) + log_level = cast(str, self.get_module_option("log_level")) + cluster_level = cast(str, self.get_module_option('log_to_cluster_level')) log_to_file = self.get_module_option("log_to_file") + assert isinstance(log_to_file, bool) log_to_cluster = self.get_module_option("log_to_cluster") + assert isinstance(log_to_cluster, bool) self._configure_logging(mgr_level, log_level, cluster_level, log_to_file, log_to_cluster) @@ -839,13 +856,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._perf_schema_cache = None # Keep a librados instance for those that need it. - self._rados = None + self._rados: Optional[rados.Rados] = None - def __del__(self): + def __del__(self) -> None: self._unconfigure_logging() @classmethod - def _register_options(cls, module_name): + def _register_options(cls, module_name: str) -> None: cls.MODULE_OPTIONS.append( Option(name='log_level', type='str', default="", runtime=True, enum_allowed=['info', 'debug', 'critical', 'error', @@ -863,14 +880,14 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): 'warning', ''])) @classmethod - def _register_commands(cls, module_name): + def _register_commands(cls, module_name: str) -> None: cls.COMMANDS.extend(CLICommand.dump_cmd_list()) @property - def log(self): + def log(self) -> logging.Logger: return self._logger - def cluster_log(self, channel, priority, message): + def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None: """ :param channel: The log channel. This can be 'cluster', 'audit', ... :param priority: The log message priority. @@ -879,11 +896,11 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._ceph_cluster_log(channel, priority.value, message) @property - def version(self): + def version(self) -> str: return self._version @property - def release_name(self): + def release_name(self) -> str: """ Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'. :return: Returns the release name of the Ceph version in lower case. @@ -891,13 +908,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_release_name() - def get_context(self): + def get_context(self) -> object: """ :return: a Python capsule containing a C++ CephContext pointer """ return self._ceph_get_context() - def notify(self, notify_type, notify_id): + def notify(self, notify_type: str, notify_id: str) -> None: """ Called by the ceph-mgr service to notify the Python plugin that new state is available. @@ -912,14 +929,16 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ pass - def _config_notify(self): + def _config_notify(self) -> None: # check logging options for changes - mgr_level = self.get_ceph_option("debug_mgr") - module_level = self.get_module_option("log_level") - cluster_level = self.get_module_option("log_to_cluster_level") + mgr_level = cast(str, self.get_ceph_option("debug_mgr")) + module_level = cast(str, self.get_module_option("log_level")) + cluster_level = cast(str, self.get_module_option("log_to_cluster_level")) + assert isinstance(cluster_level, str) log_to_file = self.get_module_option("log_to_file", False) + assert isinstance(log_to_file, bool) log_to_cluster = self.get_module_option("log_to_cluster", False) - + assert isinstance(log_to_cluster, bool) self._set_log_level(mgr_level, module_level, cluster_level) if log_to_file != self.log_to_file: @@ -936,7 +955,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): # call module subclass implementations self.config_notify() - def config_notify(self): + def config_notify(self) -> None: """ Called by the ceph-mgr service to notify the Python plugin that the configuration may have changed. Modules will want to @@ -944,7 +963,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ pass - def serve(self): + def serve(self) -> None: """ Called by the ceph-mgr service to start any server that is provided by this Python plugin. The implementation @@ -954,7 +973,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ pass - def shutdown(self): + def shutdown(self) -> None: """ Called by the ceph-mgr service to request that this module drop out of its serve() function. You do not @@ -967,7 +986,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._rados.shutdown() self._ceph_unregister_client(addrs) - def get(self, data_name): + def get(self, data_name: str): """ Called by the plugin to fetch named cluster-wide objects from ceph-mgr. @@ -983,7 +1002,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get(data_name) - def _stattype_to_str(self, stattype): + def _stattype_to_str(self, stattype: int) -> str: typeonly = stattype & self.PERFCOUNTER_TYPE_MASK if typeonly == 0: @@ -1018,21 +1037,23 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return path, label_names, labels, - def _perfvalue_to_value(self, stattype, value): + def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]: if stattype & self.PERFCOUNTER_TIME: # Convert from ns to seconds return value / 1000000000.0 else: return value - def _unit_to_str(self, unit): + def _unit_to_str(self, unit: int) -> str: if unit == self.NONE: return "/s" elif unit == self.BYTES: return "B/s" + else: + raise ValueError(f'bad unit "{unit}"') @staticmethod - def to_pretty_iec(n): + def to_pretty_iec(n: int) -> str: for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'), (20, 'Mi'), (10, 'Ki')]: if n > 10 << bits: @@ -1040,7 +1061,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return str(n) + ' ' @staticmethod - def get_pretty_row(elems, width): + def get_pretty_row(elems: Sequence[str], width: int) -> str: """ Takes an array of elements and returns a string with those elements formatted as a table row. Useful for polling modules. @@ -1057,7 +1078,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return ret - def get_pretty_header(self, elems, width): + def get_pretty_header(self, elems: Sequence[str], width: int) -> str: """ Like ``get_pretty_row`` but adds dashes, to be used as a table title. @@ -1085,7 +1106,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return ret - def get_server(self, hostname): + def get_server(self, hostname: str) -> Union[Dict[str, str], + List[Dict[str, str]]]: """ Called by the plugin to fetch metadata about a particular hostname from ceph-mgr. @@ -1097,7 +1119,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_server(hostname) - def get_perf_schema(self, svc_type, svc_name): + def get_perf_schema(self, + svc_type: str, + svc_name: str) -> Dict[str, + Dict[str, Dict[str, Union[str, int]]]]: """ Called by the plugin to fetch perf counter schema info. svc_name can be nullptr, as can svc_type, in which case @@ -1109,7 +1134,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_perf_schema(svc_type, svc_name) - def get_counter(self, svc_type, svc_name, path): + def get_counter(self, + svc_type: str, + svc_name: str, + path: str) -> List[Tuple[float, int]]: """ Called by the plugin to fetch the latest performance counter data for a particular counter on a particular service. @@ -1123,21 +1151,26 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_counter(svc_type, svc_name, path) - def get_latest_counter(self, svc_type, svc_name, path): + def get_latest_counter(self, + svc_type: str, + svc_name: str, + path: str) -> Dict[str, Union[Tuple[float, int], + Tuple[float, int, int]]]: """ Called by the plugin to fetch only the newest performance counter data - pointfor a particular counter on a particular service. + point for a particular counter on a particular service. :param str svc_type: :param str svc_name: :param str path: a period-separated concatenation of the subsystem and the counter name, for example "mds.inodes". - :return: A list of two-tuples of (timestamp, value) is returned. This may be - empty if no data is available. + :return: A list of two-tuples of (timestamp, value) or three-tuple of + (timestamp, value, count) is returned. This may be empty if no + data is available. """ return self._ceph_get_latest_counter(svc_type, svc_name, path) - def list_servers(self): + def list_servers(self) -> List[Dict[str, List[Dict[str, str]]]]: """ Like ``get_server``, but gives information about all servers (i.e. all unique hostnames that have been mentioned in daemon metadata) @@ -1147,7 +1180,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_server(None) - def get_metadata(self, svc_type, svc_id, default=None): + def get_metadata(self, + svc_type: str, + svc_id: str, + default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]: """ Fetch the daemon metadata for a particular service. @@ -1165,7 +1201,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return default return metadata - def get_daemon_status(self, svc_type, svc_id): + def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]: """ Fetch the latest status for a particular service daemon. @@ -1189,7 +1225,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}') return r - def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None): + def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]: """ Helper for modules that do simple, synchronous mon command execution. @@ -1218,7 +1254,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): svc_id: str, command: str, tag: str, - inbuf: Optional[str] = None): + inbuf: Optional[str] = None) -> None: """ Called by the plugin to send a command to the mon cluster. @@ -1242,7 +1278,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf) - def set_health_checks(self, checks): + def set_health_checks(self, checks: Dict[str, Dict[str, Sequence[str]]]) -> None: """ Set the module's current map of health checks. Argument is a dict of check names to info, in this form: @@ -1267,13 +1303,19 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ self._ceph_set_health_checks(checks) - def _handle_command(self, inbuf: str, cmd: Dict[str, Any]): + def _handle_command(self, + inbuf: str, + cmd: Dict[str, Any]) -> Union[HandleCommandResult, + Tuple[int, str, str]]: if cmd['prefix'] not in CLICommand.COMMANDS: return self.handle_command(inbuf, cmd) return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf) - def handle_command(self, inbuf: str, cmd: Dict[str, Any]): + def handle_command(self, + inbuf: str, + cmd: Dict[str, Any]) -> Union[HandleCommandResult, + Tuple[int, str, str]]: """ Called by ceph-mgr to request the plugin to handle one of the commands that it declared in self.COMMANDS @@ -1294,7 +1336,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): # any ``COMMANDS`` raise NotImplementedError() - def get_mgr_id(self): + def get_mgr_id(self) -> str: """ Retrieve the name of the manager daemon where this plugin is currently being executed (i.e. the active manager). @@ -1303,10 +1345,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_mgr_id() - def get_ceph_option(self, key): + def get_ceph_option(self, key: str) -> OptionValue: return self._ceph_get_option(key) - def _validate_module_option(self, key): + def _validate_module_option(self, key: str) -> None: """ Helper: don't allow get/set config callers to access config options that they didn't declare @@ -1316,7 +1358,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS". format(key, self.__class__.__name__)) - def _get_module_option(self, key, default, localized_prefix=""): + def _get_module_option(self, + key: str, + default: OptionValue, + localized_prefix: str = "") -> OptionValue: r = self._ceph_get_module_option(self.module_name, key, localized_prefix) if r is None: @@ -1349,7 +1394,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): r = self._ceph_get_module_option(module, key) return default if r is None else r - def get_store_prefix(self, key_prefix): + def get_store_prefix(self, key_prefix: str) -> Dict[str, str]: """ Retrieve a dict of KV store keys to values, where the keys have the given prefix @@ -1359,7 +1404,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_store_prefix(key_prefix) - def _set_localized(self, key, val, setter): + def _set_localized(self, + key: str, + val: Optional[str], + setter: Callable[[str, Optional[str]], None]) -> None: return setter(_get_localized_key(self.get_mgr_id(), key), val) def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue: @@ -1369,11 +1417,11 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._validate_module_option(key) return self._get_module_option(key, default, self.get_mgr_id()) - def _set_module_option(self, key, val): + def _set_module_option(self, key: str, val: Any) -> None: return self._ceph_set_module_option(self.module_name, key, None if val is None else str(val)) - def set_module_option(self, key, val): + def set_module_option(self, key: str, val: Any) -> None: """ Set the value of a persistent configuration setting @@ -1383,7 +1431,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._validate_module_option(key) return self._set_module_option(key, val) - def set_module_option_ex(self, module, key, val): + def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None: """ Set the value of a persistent configuration setting for the specified module. @@ -1396,7 +1444,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._validate_module_option(key) return self._ceph_set_module_option(module, key, str(val)) - def set_localized_module_option(self, key, val): + def set_localized_module_option(self, key: str, val: Optional[str]) -> None: """ Set localized configuration for this ceph-mgr instance :param str key: @@ -1406,17 +1454,14 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): self._validate_module_option(key) return self._set_localized(key, val, self._set_module_option) - def set_store(self, key, val): + def set_store(self, key: str, val: Optional[str]) -> None: """ Set a value in this module's persistent key value store. If val is None, remove key from store - - :param str key: - :param str val: """ self._ceph_set_store(key, val) - def get_store(self, key, default=None): + def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]: """ Get a value from this module's persistent key value store """ @@ -1426,7 +1471,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): else: return r - def get_localized_store(self, key, default=None): + def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]: r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key)) if r is None: r = self._ceph_get_store(key) @@ -1434,10 +1479,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): r = default return r - def set_localized_store(self, key, val): + def set_localized_store(self, key: str, val: Optional[str]) -> None: return self._set_localized(key, val, self.set_store) - def self_test(self): + def self_test(self) -> None: """ Run a self-test on the module. Override this function and implement a best as possible self-test for (automated) testing of the module @@ -1451,7 +1496,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ pass - def get_osdmap(self): + def get_osdmap(self) -> OSDMap: """ Get a handle to an OSDMap. If epoch==0, get a handle for the latest OSDMap. @@ -1459,7 +1504,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_osdmap() - def get_latest(self, daemon_type, daemon_name, counter): + def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int: data = self.get_latest_counter( daemon_type, daemon_name, counter)[counter] if data: @@ -1467,18 +1512,21 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): else: return 0 - def get_latest_avg(self, daemon_type, daemon_name, counter): + def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]: data = self.get_latest_counter( daemon_type, daemon_name, counter)[counter] if data: - return data[1], data[2] + # https://github.com/python/mypy/issues/1178 + _, value, count = cast(Tuple[float, int, int], data) + return value, count else: return 0, 0 @profile_method() - def get_all_perf_counters(self, prio_limit=PRIO_USEFUL, - services=("mds", "mon", "osd", - "rbd-mirror", "rgw", "tcmu-runner")): + def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL, + services: Sequence[str] = ("mds", "mon", "osd", + "rbd-mirror", "rgw", + "tcmu-runner")) -> Dict[str, dict]: """ Return the perf counters currently known to this ceph-mgr instance, filtered by priority equal to or greater than `prio_limit`. @@ -1498,8 +1546,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): if service['type'] not in services: continue - schema = self.get_perf_schema(service['type'], service['id']) - if not schema: + schemas = self.get_perf_schema(service['type'], service['id']) + if not schemas: self.log.warning("No perf counter schema for {0}.{1}".format( service['type'], service['id'] )) @@ -1509,20 +1557,23 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): # get just the service we're asking about svc_full_name = "{0}.{1}".format( service['type'], service['id']) - schema = schema[svc_full_name] + schema = schemas[svc_full_name] # Populate latest values for counter_path, counter_schema in schema.items(): # self.log.debug("{0}: {1}".format( # counter_path, json.dumps(counter_schema) # )) - if counter_schema['priority'] < prio_limit: + priority = counter_schema['priority'] + assert isinstance(priority, int) + if priority < prio_limit: continue + tp = counter_schema['type'] + assert isinstance(tp, int) counter_info = dict(counter_schema) - # Also populate count for the long running avgs - if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG: + if tp & self.PERFCOUNTER_LONGRUNAVG: v, c = self.get_latest_avg( service['type'], service['id'], @@ -1543,7 +1594,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return result - def set_uri(self, uri): + def set_uri(self, uri: str) -> None: """ If the module exposes a service, then call this to publish the address once it is available. @@ -1552,7 +1603,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_set_uri(uri) - def have_mon_connection(self): + def have_mon_connection(self) -> bool: """ Check whether this ceph-mgr daemon has an open connection to a monitor. If it doesn't, then it's likely that the @@ -1562,20 +1613,17 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return self._ceph_have_mon_connection() - def update_progress_event(self, evid, desc, progress, add_to_ceph_s): - return self._ceph_update_progress_event(str(evid), - str(desc), - float(progress), - bool(add_to_ceph_s)) + def update_progress_event(self, evid: str, desc: str, progress: float, add_to_ceph_s: bool) -> None: + return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s) - def complete_progress_event(self, evid): - return self._ceph_complete_progress_event(str(evid)) + def complete_progress_event(self, evid: str) -> None: + return self._ceph_complete_progress_event(evid) - def clear_all_progress_events(self): + def clear_all_progress_events(self) -> None: return self._ceph_clear_all_progress_events() @property - def rados(self): + def rados(self) -> rados.Rados: """ A librados instance to be shared by any classes within this mgr module that want one. @@ -1590,7 +1638,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return self._rados @staticmethod - def can_run(): + def can_run() -> Tuple[bool, str]: """ Implement this function to report whether the module's dependencies are met. For example, if the module needs to import a particular @@ -1605,7 +1653,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return True, "" - def remote(self, module_name, method_name, *args, **kwargs): + def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any: """ Invoke a method on another module. All arguments, and the return value from the other module must be serializable. @@ -1629,7 +1677,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return self._ceph_dispatch_remote(module_name, method_name, args, kwargs) - def add_osd_perf_query(self, query): + def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]: """ Register an OSD perf query. Argument is a dict of the query parameters, in this form: @@ -1659,7 +1707,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_add_osd_perf_query(query) - def remove_osd_perf_query(self, query_id): + def remove_osd_perf_query(self, query_id: int) -> None: """ Unregister an OSD perf query. @@ -1667,7 +1715,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_remove_osd_perf_query(query_id) - def get_osd_perf_counters(self, query_id): + def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, Any]]: """ Get stats collected for an OSD perf query. @@ -1675,7 +1723,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_osd_perf_counters(query_id) - def add_mds_perf_query(self, query): + def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]: """ Register an MDS perf query. Argument is a dict of the query parameters, in this form: @@ -1704,7 +1752,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_add_mds_perf_query(query) - def remove_mds_perf_query(self, query_id): + def remove_mds_perf_query(self, query_id: int) -> None: """ Unregister an MDS perf query. @@ -1712,7 +1760,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_remove_mds_perf_query(query_id) - def get_mds_perf_counters(self, query_id): + def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, Any]]: """ Get stats collected for an MDS perf query. @@ -1720,7 +1768,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ return self._ceph_get_mds_perf_counters(query_id) - def is_authorized(self, arguments): + def is_authorized(self, arguments: Dict[str, str]) -> bool: """ Verifies that the current session caps permit executing the py service or current module with the provided arguments. This provides a generic -- 2.39.5