import ceph_module # noqa
-from typing import Tuple, Any, Dict, Generic, Optional, Callable, List, \
- Union, TYPE_CHECKING
+from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
+ Sequence, Union, TYPE_CHECKING
if TYPE_CHECKING:
import sys
if sys.version_info >= (3, 8):
Use with MgrModule.send_command
"""
- def __init__(self, tag=None):
+ def __init__(self, tag: Optional[str] = None):
self.ev = threading.Event()
self.outs = ""
self.outb = ""
# C++ land, to avoid passing addresses around in messages.
self.tag = tag if tag else ""
- def complete(self, r, outb, outs):
+ def complete(self, r: int, outb: str, outs: str) -> None:
self.r = r
self.outb = outb
self.outs = outs
self.ev.set()
- def wait(self):
+ def wait(self) -> Tuple[int, str, str]:
self.ev.wait()
return self.r, self.outb, self.outs
return dict(result)
+HandlerFuncType = Callable[..., Tuple[int, str, str]]
+
+
class CLICommand(object):
COMMANDS = {} # type: Dict[str, CLICommand]
- def __init__(self, prefix, perm="rw", poll=False):
+ def __init__(self,
+ prefix: str,
+ perm: str = 'rw',
+ poll: bool = False):
self.prefix = prefix
self.perm = perm
self.poll = poll
KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
@staticmethod
- def load_func_metadata(f):
+ def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
desc = inspect.getdoc(f) or ''
full_argspec = inspect.getfullargspec(f)
arg_spec = full_argspec.annotations
has_default))
return desc, arg_spec, first_default, ' '.join(args)
- def store_func_metadata(self, f):
+ def store_func_metadata(self, f: HandlerFuncType) -> None:
self.desc, self.arg_spec, self.first_default, self.args = \
self.load_func_metadata(f)
- def __call__(self, func):
+ def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
self.store_func_metadata(func)
self.func = func
self.COMMANDS[self.prefix] = self
return self.func
- def _get_arg_value(self, kwargs_switch, key, val):
- def start_kwargs():
+ def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
+ def start_kwargs() -> bool:
if isinstance(val, str) and '=' in val:
k, v = val.split('=', 1)
if k in self.arg_spec:
return True
- else:
- return False
+ return False
+
if not kwargs_switch:
kwargs_switch = start_kwargs()
k, v = key, val
return kwargs_switch, k.replace('-', '_'), v
- def _collect_args_by_argspec(self, cmd_dict):
+ def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
kwargs = {}
kwargs_switch = False
for index, (name, tp) in enumerate(self.arg_spec.items()):
kwargs[k] = CephArgtype.cast_to(tp, v)
return kwargs
- def call(self, mgr, cmd_dict, inbuf):
+ def call(self, mgr: Any, cmd_dict: Dict[str, Any], inbuf: Optional[str] = None) -> HandleCommandResult:
kwargs = self._collect_args_by_argspec(cmd_dict)
if inbuf:
kwargs['inbuf'] = inbuf
assert self.func
return self.func(mgr, **kwargs)
- def dump_cmd(self):
+ def dump_cmd(self) -> Dict[str, Union[str, bool]]:
return {
'cmd': '{} {}'.format(self.prefix, self.args),
'desc': self.desc,
}
@classmethod
- def dump_cmd_list(cls):
+ def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
-def CLIReadCommand(prefix, poll=False):
+def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
return CLICommand(prefix, "r", poll)
-def CLIWriteCommand(prefix, poll=False):
+def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
return CLICommand(prefix, "w", poll)
-def CLICheckNonemptyFileInput(func):
+def CLICheckNonemptyFileInput(func: HandlerFuncType) -> HandlerFuncType:
@functools.wraps(func)
- def check(*args, **kwargs):
+ def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
if 'inbuf' not in kwargs:
return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE
if not kwargs['inbuf'] or (isinstance(kwargs['inbuf'], str)
return check
-def _get_localized_key(prefix, key):
+def _get_localized_key(prefix: str, key: str) -> str:
return '{}/{}'.format(prefix, key)
def __init__(
self,
- prefix,
- handler,
- perm="rw",
- poll=False,
+ prefix: str,
+ handler: HandlerFuncType,
+ perm: str = "rw",
+ poll: bool = False,
):
super().__init__(perm=perm,
poll=poll)
self.handler = handler
@staticmethod
- def returns_command_result(instance, f):
+ def returns_command_result(instance: Any,
+ f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
@functools.wraps(f)
- def wrapper(mgr, *args, **kwargs):
+ def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
return HandleCommandResult(retval, stdout, stderr)
wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
return wrapper
- def register(self, instance=False):
+ def register(self, instance: bool = False) -> HandlerFuncType:
"""
Register a CLICommand handler. It allows an instance to register bound
methods. In that case, the mgr instance is not passed, and it's expected
class CPlusPlusHandler(logging.Handler):
- def __init__(self, module_inst):
+ def __init__(self, module_inst: Any):
super(CPlusPlusHandler, self).__init__()
self._module = module_inst
self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
.format(module_inst.module_name)))
- def emit(self, record):
+ def emit(self, record: logging.LogRecord) -> None:
if record.levelno >= self.level:
self._module._ceph_log(self.format(record))
class ClusterLogHandler(logging.Handler):
- def __init__(self, module_inst):
+ def __init__(self, module_inst: Any):
super().__init__()
self._module = module_inst
self.setFormatter(logging.Formatter("%(message)s"))
- def emit(self, record):
+ def emit(self, record: logging.LogRecord) -> None:
levelmap = {
'DEBUG': MgrModule.ClusterLogPrio.DEBUG,
'INFO': MgrModule.ClusterLogPrio.INFO,
class FileHandler(logging.FileHandler):
- def __init__(self, module_inst):
+ def __init__(self, module_inst: Any):
path = module_inst.get_ceph_option("log_file")
idx = path.rfind(".log")
if idx != -1:
class MgrModuleLoggingMixin(object):
- def _configure_logging(self, mgr_level, module_level, cluster_level,
- log_to_file, log_to_cluster):
- self._mgr_level = None
- self._module_level = None
+ def _configure_logging(self,
+ mgr_level: str,
+ module_level: str,
+ cluster_level: str,
+ log_to_file: bool,
+ log_to_cluster: bool) -> None:
+ self._mgr_level: Optional[str] = None
+ self._module_level: Optional[str] = None
self._root_logger = logging.getLogger()
self._unconfigure_logging()
self._root_logger.setLevel(logging.NOTSET)
self._set_log_level(mgr_level, module_level, cluster_level)
- def _unconfigure_logging(self):
+ def _unconfigure_logging(self) -> None:
# remove existing handlers:
rm_handlers = [
h for h in self._root_logger.handlers
self.log_to_file = False
self.log_to_cluster = False
- def _set_log_level(self, mgr_level, module_level, cluster_level):
+ def _set_log_level(self,
+ mgr_level: str,
+ module_level: str,
+ cluster_level: str) -> None:
self._cluster_log_handler.setLevel(cluster_level.upper())
module_level = module_level.upper() if module_level else ''
self._mgr_log_handler.setLevel(level)
self._file_log_handler.setLevel(level)
- def _enable_file_log(self):
+ def _enable_file_log(self) -> None:
# enable file log
self.getLogger().warning("enabling logging to file")
self.log_to_file = True
self._root_logger.addHandler(self._file_log_handler)
- def _disable_file_log(self):
+ def _disable_file_log(self) -> None:
# disable file log
self.getLogger().warning("disabling logging to file")
self.log_to_file = False
self._root_logger.removeHandler(self._file_log_handler)
- def _enable_cluster_log(self):
+ def _enable_cluster_log(self) -> None:
# enable cluster log
self.getLogger().warning("enabling logging to cluster")
self.log_to_cluster = True
self._root_logger.addHandler(self._cluster_log_handler)
- def _disable_cluster_log(self):
+ def _disable_cluster_log(self) -> None:
# disable cluster log
self.getLogger().warning("disabling logging to cluster")
self.log_to_cluster = False
self._root_logger.removeHandler(self._cluster_log_handler)
- def _ceph_log_level_to_python(self, ceph_log_level):
- if ceph_log_level:
+ def _ceph_log_level_to_python(self, log_level: str) -> str:
+ if log_level:
try:
- ceph_log_level = int(ceph_log_level.split("/", 1)[0])
+ ceph_log_level = int(log_level.split("/", 1)[0])
except ValueError:
ceph_log_level = 0
else:
log_level = "INFO"
return log_level
- def getLogger(self, name=None):
+ def getLogger(self, name: Optional[str] = None) -> logging.Logger:
return logging.getLogger(name)
MODULE_OPTIONS: List[Option] = []
MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
- def __init__(self, module_name, capsule):
+ def __init__(self, module_name: str, capsule: Any):
super(MgrStandbyModule, self).__init__(capsule)
self.module_name = module_name
else:
self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
- mgr_level = self.get_ceph_option("debug_mgr")
- log_level = self.get_module_option("log_level")
- cluster_level = self.get_module_option('log_to_cluster_level')
+ # mock does not return a str
+ mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+ log_level = cast(str, self.get_module_option("log_level"))
+ cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
self._configure_logging(mgr_level, log_level, cluster_level,
False, False)
# for backwards compatibility
self._logger = self.getLogger()
- def __del__(self):
+ def __del__(self) -> None:
self._cleanup()
self._unconfigure_logging()
- def _cleanup(self):
+ def _cleanup(self) -> None:
pass
@classmethod
- def _register_options(cls, module_name):
+ def _register_options(cls, module_name: str) -> None:
cls.MODULE_OPTIONS.append(
Option(name='log_level', type='str', default="", runtime=True,
enum_allowed=['info', 'debug', 'critical', 'error',
'warning', '']))
@property
- def log(self):
+ def log(self) -> logging.Logger:
return self._logger
- def serve(self):
+ def serve(self) -> None:
"""
The serve method is mandatory for standby modules.
:return:
"""
raise NotImplementedError()
- def get_mgr_id(self):
+ def get_mgr_id(self) -> str:
return self._ceph_get_mgr_id()
def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
else:
return r
- def get_ceph_option(self, key):
+ def get_ceph_option(self, key: str) -> OptionValue:
return self._ceph_get_option(key)
- def get_store(self, key):
+ def get_store(self, key: str) -> Optional[str]:
"""
Retrieve the value of a persistent KV store entry
"""
return self._ceph_get_store(key)
- def get_active_uri(self):
+ def get_active_uri(self) -> str:
return self._ceph_get_active_uri()
def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
WARN = 3
ERROR = 4
- def __init__(self, module_name, py_modules_ptr, this_ptr):
+ def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
self.module_name = module_name
super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
# with default and user-supplied option values.
self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
- mgr_level = self.get_ceph_option("debug_mgr")
- log_level = self.get_module_option("log_level")
- cluster_level = self.get_module_option('log_to_cluster_level')
+ mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+ log_level = cast(str, self.get_module_option("log_level"))
+ cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
log_to_file = self.get_module_option("log_to_file")
+ assert isinstance(log_to_file, bool)
log_to_cluster = self.get_module_option("log_to_cluster")
+ assert isinstance(log_to_cluster, bool)
self._configure_logging(mgr_level, log_level, cluster_level,
log_to_file, log_to_cluster)
self._perf_schema_cache = None
# Keep a librados instance for those that need it.
- self._rados = None
+ self._rados: Optional[rados.Rados] = None
- def __del__(self):
+ def __del__(self) -> None:
self._unconfigure_logging()
@classmethod
- def _register_options(cls, module_name):
+ def _register_options(cls, module_name: str) -> None:
cls.MODULE_OPTIONS.append(
Option(name='log_level', type='str', default="", runtime=True,
enum_allowed=['info', 'debug', 'critical', 'error',
'warning', '']))
@classmethod
- def _register_commands(cls, module_name):
+ def _register_commands(cls, module_name: str) -> None:
cls.COMMANDS.extend(CLICommand.dump_cmd_list())
@property
- def log(self):
+ def log(self) -> logging.Logger:
return self._logger
- def cluster_log(self, channel, priority, message):
+ def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
"""
:param channel: The log channel. This can be 'cluster', 'audit', ...
:param priority: The log message priority.
self._ceph_cluster_log(channel, priority.value, message)
@property
- def version(self):
+ def version(self) -> str:
return self._version
@property
- def release_name(self):
+ def release_name(self) -> str:
"""
Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
:return: Returns the release name of the Ceph version in lower case.
"""
return self._ceph_get_release_name()
- def get_context(self):
+ def get_context(self) -> object:
"""
:return: a Python capsule containing a C++ CephContext pointer
"""
return self._ceph_get_context()
- def notify(self, notify_type, notify_id):
+ def notify(self, notify_type: str, notify_id: str) -> None:
"""
Called by the ceph-mgr service to notify the Python plugin
that new state is available.
"""
pass
- def _config_notify(self):
+ def _config_notify(self) -> None:
# check logging options for changes
- mgr_level = self.get_ceph_option("debug_mgr")
- module_level = self.get_module_option("log_level")
- cluster_level = self.get_module_option("log_to_cluster_level")
+ mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+ module_level = cast(str, self.get_module_option("log_level"))
+ cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
+ assert isinstance(cluster_level, str)
log_to_file = self.get_module_option("log_to_file", False)
+ assert isinstance(log_to_file, bool)
log_to_cluster = self.get_module_option("log_to_cluster", False)
-
+ assert isinstance(log_to_cluster, bool)
self._set_log_level(mgr_level, module_level, cluster_level)
if log_to_file != self.log_to_file:
# call module subclass implementations
self.config_notify()
- def config_notify(self):
+ def config_notify(self) -> None:
"""
Called by the ceph-mgr service to notify the Python plugin
that the configuration may have changed. Modules will want to
"""
pass
- def serve(self):
+ def serve(self) -> None:
"""
Called by the ceph-mgr service to start any server that
is provided by this Python plugin. The implementation
"""
pass
- def shutdown(self):
+ def shutdown(self) -> None:
"""
Called by the ceph-mgr service to request that this
module drop out of its serve() function. You do not
self._rados.shutdown()
self._ceph_unregister_client(addrs)
- def get(self, data_name):
+ def get(self, data_name: str):
"""
Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
"""
return self._ceph_get(data_name)
- def _stattype_to_str(self, stattype):
+ def _stattype_to_str(self, stattype: int) -> str:
typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
if typeonly == 0:
return path, label_names, labels,
- def _perfvalue_to_value(self, stattype, value):
+ def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
if stattype & self.PERFCOUNTER_TIME:
# Convert from ns to seconds
return value / 1000000000.0
else:
return value
- def _unit_to_str(self, unit):
+ def _unit_to_str(self, unit: int) -> str:
if unit == self.NONE:
return "/s"
elif unit == self.BYTES:
return "B/s"
+ else:
+ raise ValueError(f'bad unit "{unit}"')
@staticmethod
- def to_pretty_iec(n):
+ def to_pretty_iec(n: int) -> str:
for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
(20, 'Mi'), (10, 'Ki')]:
if n > 10 << bits:
return str(n) + ' '
@staticmethod
- def get_pretty_row(elems, width):
+ def get_pretty_row(elems: Sequence[str], width: int) -> str:
"""
Takes an array of elements and returns a string with those elements
formatted as a table row. Useful for polling modules.
return ret
- def get_pretty_header(self, elems, width):
+ def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
"""
Like ``get_pretty_row`` but adds dashes, to be used as a table title.
return ret
- def get_server(self, hostname):
+ def get_server(self, hostname: str) -> Union[Dict[str, str],
+ List[Dict[str, str]]]:
"""
Called by the plugin to fetch metadata about a particular hostname from
ceph-mgr.
"""
return self._ceph_get_server(hostname)
- def get_perf_schema(self, svc_type, svc_name):
+ def get_perf_schema(self,
+ svc_type: str,
+ svc_name: str) -> Dict[str,
+ Dict[str, Dict[str, Union[str, int]]]]:
"""
Called by the plugin to fetch perf counter schema info.
svc_name can be nullptr, as can svc_type, in which case
"""
return self._ceph_get_perf_schema(svc_type, svc_name)
- def get_counter(self, svc_type, svc_name, path):
+ def get_counter(self,
+ svc_type: str,
+ svc_name: str,
+ path: str) -> List[Tuple[float, int]]:
"""
Called by the plugin to fetch the latest performance counter data for a
particular counter on a particular service.
"""
return self._ceph_get_counter(svc_type, svc_name, path)
- def get_latest_counter(self, svc_type, svc_name, path):
+ def get_latest_counter(self,
+ svc_type: str,
+ svc_name: str,
+ path: str) -> Dict[str, Union[Tuple[float, int],
+ Tuple[float, int, int]]]:
"""
Called by the plugin to fetch only the newest performance counter data
- pointfor a particular counter on a particular service.
+ point for a particular counter on a particular service.
:param str svc_type:
:param str svc_name:
:param str path: a period-separated concatenation of the subsystem and the
counter name, for example "mds.inodes".
- :return: A list of two-tuples of (timestamp, value) is returned. This may be
- empty if no data is available.
+ :return: A list of two-tuples of (timestamp, value) or three-tuple of
+ (timestamp, value, count) is returned. This may be empty if no
+ data is available.
"""
return self._ceph_get_latest_counter(svc_type, svc_name, path)
- def list_servers(self):
+ def list_servers(self) -> List[Dict[str, List[Dict[str, str]]]]:
"""
Like ``get_server``, but gives information about all servers (i.e. all
unique hostnames that have been mentioned in daemon metadata)
"""
return self._ceph_get_server(None)
- def get_metadata(self, svc_type, svc_id, default=None):
+ def get_metadata(self,
+ svc_type: str,
+ svc_id: str,
+ default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
"""
Fetch the daemon metadata for a particular service.
return default
return metadata
- def get_daemon_status(self, svc_type, svc_id):
+ def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
"""
Fetch the latest status for a particular service daemon.
raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
return r
- def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None):
+ def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""
Helper for modules that do simple, synchronous mon command
execution.
svc_id: str,
command: str,
tag: str,
- inbuf: Optional[str] = None):
+ inbuf: Optional[str] = None) -> None:
"""
Called by the plugin to send a command to the mon
cluster.
"""
self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
- def set_health_checks(self, checks):
+ def set_health_checks(self, checks: Dict[str, Dict[str, Sequence[str]]]) -> None:
"""
Set the module's current map of health checks. Argument is a
dict of check names to info, in this form:
"""
self._ceph_set_health_checks(checks)
- def _handle_command(self, inbuf: str, cmd: Dict[str, Any]):
+ def _handle_command(self,
+ inbuf: str,
+ cmd: Dict[str, Any]) -> Union[HandleCommandResult,
+ Tuple[int, str, str]]:
if cmd['prefix'] not in CLICommand.COMMANDS:
return self.handle_command(inbuf, cmd)
return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
- def handle_command(self, inbuf: str, cmd: Dict[str, Any]):
+ def handle_command(self,
+ inbuf: str,
+ cmd: Dict[str, Any]) -> Union[HandleCommandResult,
+ Tuple[int, str, str]]:
"""
Called by ceph-mgr to request the plugin to handle one
of the commands that it declared in self.COMMANDS
# any ``COMMANDS``
raise NotImplementedError()
- def get_mgr_id(self):
+ def get_mgr_id(self) -> str:
"""
Retrieve the name of the manager daemon where this plugin
is currently being executed (i.e. the active manager).
"""
return self._ceph_get_mgr_id()
- def get_ceph_option(self, key):
+ def get_ceph_option(self, key: str) -> OptionValue:
return self._ceph_get_option(key)
- def _validate_module_option(self, key):
+ def _validate_module_option(self, key: str) -> None:
"""
Helper: don't allow get/set config callers to
access config options that they didn't declare
raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
format(key, self.__class__.__name__))
- def _get_module_option(self, key, default, localized_prefix=""):
+ def _get_module_option(self,
+ key: str,
+ default: OptionValue,
+ localized_prefix: str = "") -> OptionValue:
r = self._ceph_get_module_option(self.module_name, key,
localized_prefix)
if r is None:
r = self._ceph_get_module_option(module, key)
return default if r is None else r
- def get_store_prefix(self, key_prefix):
+ def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
"""
Retrieve a dict of KV store keys to values, where the keys
have the given prefix
"""
return self._ceph_get_store_prefix(key_prefix)
- def _set_localized(self, key, val, setter):
+ def _set_localized(self,
+ key: str,
+ val: Optional[str],
+ setter: Callable[[str, Optional[str]], None]) -> None:
return setter(_get_localized_key(self.get_mgr_id(), key), val)
def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
self._validate_module_option(key)
return self._get_module_option(key, default, self.get_mgr_id())
- def _set_module_option(self, key, val):
+ def _set_module_option(self, key: str, val: Any) -> None:
return self._ceph_set_module_option(self.module_name, key,
None if val is None else str(val))
- def set_module_option(self, key, val):
+ def set_module_option(self, key: str, val: Any) -> None:
"""
Set the value of a persistent configuration setting
self._validate_module_option(key)
return self._set_module_option(key, val)
- def set_module_option_ex(self, module, key, val):
+ def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
"""
Set the value of a persistent configuration setting
for the specified module.
self._validate_module_option(key)
return self._ceph_set_module_option(module, key, str(val))
- def set_localized_module_option(self, key, val):
+ def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
"""
Set localized configuration for this ceph-mgr instance
:param str key:
self._validate_module_option(key)
return self._set_localized(key, val, self._set_module_option)
- def set_store(self, key, val):
+ def set_store(self, key: str, val: Optional[str]) -> None:
"""
Set a value in this module's persistent key value store.
If val is None, remove key from store
-
- :param str key:
- :param str val:
"""
self._ceph_set_store(key, val)
- def get_store(self, key, default=None):
+ def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
"""
Get a value from this module's persistent key value store
"""
else:
return r
- def get_localized_store(self, key, default=None):
+ def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
if r is None:
r = self._ceph_get_store(key)
r = default
return r
- def set_localized_store(self, key, val):
+ def set_localized_store(self, key: str, val: Optional[str]) -> None:
return self._set_localized(key, val, self.set_store)
- def self_test(self):
+ def self_test(self) -> None:
"""
Run a self-test on the module. Override this function and implement
a best as possible self-test for (automated) testing of the module
"""
pass
- def get_osdmap(self):
+ def get_osdmap(self) -> OSDMap:
"""
Get a handle to an OSDMap. If epoch==0, get a handle for the latest
OSDMap.
"""
return self._ceph_get_osdmap()
- def get_latest(self, daemon_type, daemon_name, counter):
+ def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
data = self.get_latest_counter(
daemon_type, daemon_name, counter)[counter]
if data:
else:
return 0
- def get_latest_avg(self, daemon_type, daemon_name, counter):
+ def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
data = self.get_latest_counter(
daemon_type, daemon_name, counter)[counter]
if data:
- return data[1], data[2]
+ # https://github.com/python/mypy/issues/1178
+ _, value, count = cast(Tuple[float, int, int], data)
+ return value, count
else:
return 0, 0
@profile_method()
- def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
- services=("mds", "mon", "osd",
- "rbd-mirror", "rgw", "tcmu-runner")):
+ def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
+ services: Sequence[str] = ("mds", "mon", "osd",
+ "rbd-mirror", "rgw",
+ "tcmu-runner")) -> Dict[str, dict]:
"""
Return the perf counters currently known to this ceph-mgr
instance, filtered by priority equal to or greater than `prio_limit`.
if service['type'] not in services:
continue
- schema = self.get_perf_schema(service['type'], service['id'])
- if not schema:
+ schemas = self.get_perf_schema(service['type'], service['id'])
+ if not schemas:
self.log.warning("No perf counter schema for {0}.{1}".format(
service['type'], service['id']
))
# get just the service we're asking about
svc_full_name = "{0}.{1}".format(
service['type'], service['id'])
- schema = schema[svc_full_name]
+ schema = schemas[svc_full_name]
# Populate latest values
for counter_path, counter_schema in schema.items():
# self.log.debug("{0}: {1}".format(
# counter_path, json.dumps(counter_schema)
# ))
- if counter_schema['priority'] < prio_limit:
+ priority = counter_schema['priority']
+ assert isinstance(priority, int)
+ if priority < prio_limit:
continue
+ tp = counter_schema['type']
+ assert isinstance(tp, int)
counter_info = dict(counter_schema)
-
# Also populate count for the long running avgs
- if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
+ if tp & self.PERFCOUNTER_LONGRUNAVG:
v, c = self.get_latest_avg(
service['type'],
service['id'],
return result
- def set_uri(self, uri):
+ def set_uri(self, uri: str) -> None:
"""
If the module exposes a service, then call this to publish the
address once it is available.
"""
return self._ceph_set_uri(uri)
- def have_mon_connection(self):
+ def have_mon_connection(self) -> bool:
"""
Check whether this ceph-mgr daemon has an open connection
to a monitor. If it doesn't, then it's likely that the
return self._ceph_have_mon_connection()
- def update_progress_event(self, evid, desc, progress, add_to_ceph_s):
- return self._ceph_update_progress_event(str(evid),
- str(desc),
- float(progress),
- bool(add_to_ceph_s))
+ def update_progress_event(self, evid: str, desc: str, progress: float, add_to_ceph_s: bool) -> None:
+ return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
- def complete_progress_event(self, evid):
- return self._ceph_complete_progress_event(str(evid))
+ def complete_progress_event(self, evid: str) -> None:
+ return self._ceph_complete_progress_event(evid)
- def clear_all_progress_events(self):
+ def clear_all_progress_events(self) -> None:
return self._ceph_clear_all_progress_events()
@property
- def rados(self):
+ def rados(self) -> rados.Rados:
"""
A librados instance to be shared by any classes within
this mgr module that want one.
return self._rados
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
"""
Implement this function to report whether the module's dependencies
are met. For example, if the module needs to import a particular
return True, ""
- def remote(self, module_name, method_name, *args, **kwargs):
+ def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
"""
Invoke a method on another module. All arguments, and the return
value from the other module must be serializable.
return self._ceph_dispatch_remote(module_name, method_name,
args, kwargs)
- def add_osd_perf_query(self, query):
+ def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
"""
Register an OSD perf query. Argument is a
dict of the query parameters, in this form:
"""
return self._ceph_add_osd_perf_query(query)
- def remove_osd_perf_query(self, query_id):
+ def remove_osd_perf_query(self, query_id: int) -> None:
"""
Unregister an OSD perf query.
"""
return self._ceph_remove_osd_perf_query(query_id)
- def get_osd_perf_counters(self, query_id):
+ def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, Any]]:
"""
Get stats collected for an OSD perf query.
"""
return self._ceph_get_osd_perf_counters(query_id)
- def add_mds_perf_query(self, query):
+ def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
"""
Register an MDS perf query. Argument is a
dict of the query parameters, in this form:
"""
return self._ceph_add_mds_perf_query(query)
- def remove_mds_perf_query(self, query_id):
+ def remove_mds_perf_query(self, query_id: int) -> None:
"""
Unregister an MDS perf query.
"""
return self._ceph_remove_mds_perf_query(query_id)
- def get_mds_perf_counters(self, query_id):
+ def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, Any]]:
"""
Get stats collected for an MDS perf query.
"""
return self._ceph_get_mds_perf_counters(query_id)
- def is_authorized(self, arguments):
+ def is_authorized(self, arguments: Dict[str, str]) -> bool:
"""
Verifies that the current session caps permit executing the py service
or current module with the provided arguments. This provides a generic