From: Kefu Chai Date: Fri, 26 Feb 2021 03:48:53 +0000 (+0800) Subject: mgr/prometheus: add type annotations X-Git-Tag: v17.1.0~2710^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7f438440f91dea588603b18cbfc00340cd535703;p=ceph.git mgr/prometheus: add type annotations Signed-off-by: Kefu Chai --- diff --git a/src/mypy.ini b/src/mypy.ini index 3e7381cc9ab3..b25d7621aabc 100755 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -52,6 +52,9 @@ disallow_untyped_defs = True [mypy-orchestrator.*] disallow_untyped_defs = True +[mypy-prometheus.*] +disallow_untyped_defs = True + [mypy-rbd_support.*] disallow_untyped_defs = True diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index fd8a71b78b5b..8bde06df4c37 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1033,8 +1033,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return '' - def _perfpath_to_path_labels(self, daemon, path): - # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]] + def _perfpath_to_path_labels(self, daemon: str, + path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]: label_names = ("ceph_daemon",) # type: Tuple[str, ...] labels = (daemon,) # type: Tuple[str, ...] diff --git a/src/pybind/mgr/prometheus/module.py b/src/pybind/mgr/prometheus/module.py index 2cfedf152e7e..771284d4f4bc 100644 --- a/src/pybind/mgr/prometheus/module.py +++ b/src/pybind/mgr/prometheus/module.py @@ -14,7 +14,7 @@ from mgr_util import get_default_addr, profile_method from rbd import RBD from collections import namedtuple try: - from typing import DefaultDict, Optional, Dict, Any, List, Set, cast + from typing import DefaultDict, Optional, Dict, Any, List, Set, Tuple, Union, cast except ImportError: pass @@ -40,11 +40,10 @@ if cherrypy is not None: # cherrypy likes to sys.exit on error. don't let it take us down too! -def os_exit_noop(*args, **kwargs): +def os_exit_noop(status: int) -> None: pass - -os._exit = os_exit_noop +os._exit = os_exit_noop # type: ignore # to access things in class Module from subclass Root. Because # it's a dict, the writer doesn't need to declare 'global' for access @@ -55,7 +54,7 @@ cherrypy.config.update({ }) -def health_status_to_number(status): +def health_status_to_number(status: str) -> int: if status == 'HEALTH_OK': return 0 elif status == 'HEALTH_WARN': @@ -121,24 +120,24 @@ HEALTH_CHECKS = [ class Metric(object): - def __init__(self, mtype, name, desc, labels=None): + def __init__(self, mtype: str, name: str, desc: str, labels: Optional[Tuple[str, ...]] = None) -> None: self.mtype = mtype self.name = name self.desc = desc self.labelnames = labels # tuple if present - self.value = {} # indexed by label values + self.value: Dict[Tuple[str, ...], Union[float, int]] = {} # indexed by label values - def clear(self): + def clear(self) -> None: self.value = {} - def set(self, value, labelvalues=None): + def set(self, value: Union[float, int], labelvalues: Optional[Tuple[str, ...]] = None) -> None: # labelvalues must be a tuple labelvalues = labelvalues or ('',) self.value[labelvalues] = value - def str_expfmt(self): + def str_expfmt(self) -> str: - def promethize(path): + def promethize(path: str) -> str: ''' replace illegal metric name characters ''' result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus') @@ -151,7 +150,7 @@ class Metric(object): return "ceph_{0}".format(result) - def floatstr(value): + def floatstr(value: float) -> str: ''' represent as Go-compatible float ''' if value == float('inf'): return '+Inf' @@ -189,14 +188,13 @@ class Metric(object): class MetricCollectionThread(threading.Thread): - def __init__(self, module): - # type: (Module) -> None + def __init__(self, module: 'Module') -> None: self.mod = module self.active = True self.event = threading.Event() super(MetricCollectionThread, self).__init__(target=self.collect) - def collect(self): + def collect(self) -> None: self.mod.log.info('starting metric collection thread') while self.active: self.mod.log.debug('collecting cache in thread') @@ -237,7 +235,7 @@ class MetricCollectionThread(threading.Thread): self.mod.log.error('No MON connection') self.event.wait(self.mod.scrape_interval) - def stop(self): + def stop(self) -> None: self.active = False self.event.set() @@ -274,7 +272,7 @@ class Module(MgrModule): STALE_CACHE_FAIL = 'fail' STALE_CACHE_RETURN = 'return' - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super(Module, self).__init__(*args, **kwargs) self.metrics = self._setup_static_metrics() self.shutdown_event = threading.Event() @@ -282,7 +280,7 @@ class Module(MgrModule): self.collect_time = 0.0 self.scrape_interval: float = 15.0 self.stale_cache_strategy: str = self.STALE_CACHE_FAIL - self.collect_cache = None + self.collect_cache: Optional[str] = None self.rbd_stats = { 'pools': {}, 'pools_refresh_time': 0, @@ -305,7 +303,7 @@ class Module(MgrModule): _global_instance = self self.metrics_thread = MetricCollectionThread(_global_instance) - def _setup_static_metrics(self): + def _setup_static_metrics(self) -> Dict[str, Metric]: metrics = {} metrics['health_status'] = Metric( 'untyped', @@ -478,9 +476,9 @@ class Module(MgrModule): return metrics @profile_method() - def get_health(self): + def get_health(self) -> None: - def _get_value(message, delim=' ', word_pos=0): + def _get_value(message: str, delim: str = ' ', word_pos: int = 0) -> Tuple[int, int]: """Extract value from message (default is 1st field)""" v_str = message.split(delim)[word_pos] if v_str.isdigit(): @@ -527,7 +525,7 @@ class Module(MgrModule): self.metrics[path].set(0) @profile_method() - def get_pool_stats(self): + def get_pool_stats(self) -> None: # retrieve pool stats to provide per pool recovery metrics # (osd_pool_stats moved to mgr in Mimic) pstats = self.get('osd_pool_stats') @@ -539,7 +537,7 @@ class Module(MgrModule): ) @profile_method() - def get_df(self): + def get_df(self) -> None: # maybe get the to-be-exported metrics from a config? df = self.get('df') for stat in DF_CLUSTER: @@ -553,18 +551,21 @@ class Module(MgrModule): ) @profile_method() - def get_fs(self): + def get_fs(self) -> None: fs_map = self.get('fs_map') servers = self.get_service_list() self.log.debug('standbys: {}'.format(fs_map['standbys'])) # export standby mds metadata, default standby fs_id is '-1' for standby in fs_map['standbys']: id_ = standby['name'] - host_version = servers.get((id_, 'mds'), ('', '')) + host, version = servers.get((id_, 'mds'), ('', '')) + addr, rank = standby['addr'], standby['rank'] self.metrics['mds_metadata'].set(1, ( 'mds.{}'.format(id_), '-1', - host_version[0], standby['addr'], - standby['rank'], host_version[1] + cast(str, host), + cast(str, addr), + cast(str, rank), + cast(str, version) )) for fs in fs_map['filesystems']: # collect fs metadata @@ -579,15 +580,15 @@ class Module(MgrModule): self.log.debug('mdsmap: {}'.format(fs['mdsmap'])) for gid, daemon in fs['mdsmap']['info'].items(): id_ = daemon['name'] - host_version = servers.get((id_, 'mds'), ('', '')) + host, version = servers.get((id_, 'mds'), ('', '')) self.metrics['mds_metadata'].set(1, ( 'mds.{}'.format(id_), fs['id'], - host_version[0], daemon['addr'], - daemon['rank'], host_version[1] + host, daemon['addr'], + daemon['rank'], version )) @profile_method() - def get_quorum_status(self): + def get_quorum_status(self) -> None: mon_status = json.loads(self.get('mon_status')['json']) servers = self.get_service_list() for mon in mon_status['monmap']['mons']: @@ -605,7 +606,7 @@ class Module(MgrModule): )) @profile_method() - def get_mgr_status(self): + def get_mgr_status(self) -> None: mgr_map = self.get('mgr_map') servers = self.get_service_list() @@ -619,19 +620,17 @@ class Module(MgrModule): for module in mgr_map['available_modules']} for mgr in all_mgrs: - host_version = servers.get((mgr, 'mgr'), ('', '')) + host, version = servers.get((mgr, 'mgr'), ('', '')) if mgr == active: _state = 1 else: _state = 0 self.metrics['mgr_metadata'].set(1, ( - 'mgr.{}'.format(mgr), host_version[0], - host_version[1] + f'mgr.{mgr}', host, version )) self.metrics['mgr_status'].set(_state, ( - 'mgr.{}'.format(mgr), - )) + f'mgr.{mgr}',)) always_on_modules = mgr_map['always_on_modules'].get(self.release_name, []) active_modules = list(always_on_modules) active_modules.extend(mgr_map['modules']) @@ -650,7 +649,7 @@ class Module(MgrModule): self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,)) @profile_method() - def get_pg_status(self): + def get_pg_status(self) -> None: pg_summary = self.get('pg_summary') @@ -669,7 +668,7 @@ class Module(MgrModule): self.log.warning("skipping pg in unknown state {}".format(state)) @profile_method() - def get_osd_stats(self): + def get_osd_stats(self) -> None: osd_stats = self.get('osd_stats') for osd in osd_stats['osd_stats']: id_ = osd['osd'] @@ -679,17 +678,17 @@ class Module(MgrModule): 'osd.{}'.format(id_), )) - def get_service_list(self): + def get_service_list(self) -> Dict[Tuple[str, str], Tuple[str, str]]: ret = {} for server in self.list_servers(): - version = server.get('ceph_version', '') - host = server.get('hostname', '') + version = cast(str, server.get('ceph_version', '')) + host = cast(str, server.get('hostname', '')) for service in cast(List[ServiceInfoT], server.get('services', [])): ret.update({(service['id'], service['type']): (host, version)}) return ret @profile_method() - def get_metadata_and_osd_status(self): + def get_metadata_and_osd_status(self) -> None: osd_map = self.get('osd_map') osd_flags = osd_map['flags'].split(',') for flag in OSD_FLAGS: @@ -814,20 +813,22 @@ class Module(MgrModule): continue mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type, service_id) + rbd_mirror_metadata = cast(Tuple[str, ...], + (mirror_metadata.get(k, '') + for k in RBD_MIRROR_METADATA)) self.metrics['rbd_mirror_metadata'].set( - 1, (mirror_metadata.get(k, '') - for k in RBD_MIRROR_METADATA) + 1, rbd_mirror_metadata ) @profile_method() - def get_num_objects(self): + def get_num_objects(self) -> None: pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum'] for obj in NUM_OBJECTS: stat = 'num_objects_{}'.format(obj) self.metrics[stat].set(pg_sum[stat]) @profile_method() - def get_rbd_stats(self): + def get_rbd_stats(self) -> None: # Per RBD image stats is collected by registering a dynamic osd perf # stats query that tells OSDs to group stats for requests associated # with RBD objects by pool, namespace, and image id, which are @@ -1021,7 +1022,7 @@ class Module(MgrModule): self.metrics[path].set(counters[i][1], labels) i += 1 - def refresh_rbd_stats_pools(self, pools): + def refresh_rbd_stats_pools(self, pools: Dict[str, Set[str]]) -> None: self.log.debug('refreshing rbd pools %s' % (pools)) rbd = RBD() @@ -1066,14 +1067,14 @@ class Module(MgrModule): self.log.error('failed listing pool %s: %s' % (pool_name, e)) self.rbd_stats['pools_refresh_time'] = time.time() - def shutdown_rbd_stats(self): + def shutdown_rbd_stats(self) -> None: if 'query_id' in self.rbd_stats: self.remove_osd_perf_query(self.rbd_stats['query_id']) del self.rbd_stats['query_id'] del self.rbd_stats['query'] self.rbd_stats['pools'].clear() - def add_fixed_name_metrics(self): + def add_fixed_name_metrics(self) -> None: """ Add fixed name metrics from existing ones that have details in their names that should be in labels (not in name). @@ -1083,25 +1084,25 @@ class Module(MgrModule): See: https://tracker.ceph.com/issues/45311 """ new_metrics = {} - for metric_path in self.metrics.keys(): + for metric_path, metrics in self.metrics.items(): # Address RGW sync perf. counters. match = re.search(r'^data-sync-from-(.*)\.', metric_path) if match: new_path = re.sub('from-([^.]*)', 'from-zone', metric_path) if new_path not in new_metrics: new_metrics[new_path] = Metric( - self.metrics[metric_path].mtype, + metrics.mtype, new_path, - self.metrics[metric_path].desc, - self.metrics[metric_path].labelnames + ('source_zone',) + metrics.desc, + cast(Tuple[str, ...], metrics.labelnames) + ('source_zone',) ) - for label_values, value in self.metrics[metric_path].value.items(): + for label_values, value in metrics.value.items(): new_metrics[new_path].set(value, label_values + (match.group(1),)) self.metrics.update(new_metrics) @profile_method(True) - def collect(self): + def collect(self) -> str: # Clear the metrics before scraping for k in self.metrics.keys(): self.metrics[k].clear() @@ -1174,7 +1175,7 @@ class Module(MgrModule): return ''.join(_metrics) + '\n' @CLIReadCommand('prometheus file_sd_config') - def get_file_sd_config(self): + def get_file_sd_config(self) -> Tuple[int, str, str]: ''' Return file_sd compatible prometheus config for mgr cluster ''' @@ -1196,21 +1197,21 @@ class Module(MgrModule): ] return 0, json.dumps(ret), "" - def self_test(self): + def self_test(self) -> None: self.collect() self.get_file_sd_config() - def serve(self): + def serve(self) -> None: class Root(object): # collapse everything to '/' - def _cp_dispatch(self, vpath): + def _cp_dispatch(self, vpath: str) -> 'Root': cherrypy.request.path = '' return self @cherrypy.expose - def index(self): + def index(self) -> str: return ''' Ceph Exporter @@ -1221,20 +1222,19 @@ class Module(MgrModule): ''' @cherrypy.expose - def metrics(self): + def metrics(self) -> Optional[str]: # Lock the function execution assert isinstance(_global_instance, Module) with _global_instance.collect_lock: return self._metrics(_global_instance) @staticmethod - def _metrics(instance): - # type: (Module) -> Any + def _metrics(instance: 'Module') -> Optional[str]: # Return cached data if available if not instance.collect_cache: raise cherrypy.HTTPError(503, 'No cached data available yet') - def respond(): + def respond() -> Optional[str]: assert isinstance(instance, Module) cherrypy.response.headers['Content-Type'] = 'text/plain' return instance.collect_cache @@ -1265,6 +1265,7 @@ class Module(MgrModule): ) instance.log.error(msg) raise cherrypy.HTTPError(503, msg) + return None # Make the cache timeout for collecting configurable self.scrape_interval = cast(float, self.get_localized_module_option('scrape_interval')) @@ -1313,17 +1314,17 @@ class Module(MgrModule): # wait for the metrics collection thread to stop self.metrics_thread.join() - def shutdown(self): + def shutdown(self) -> None: self.log.info('Stopping engine...') self.shutdown_event.set() class StandbyModule(MgrStandbyModule): - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super(StandbyModule, self).__init__(*args, **kwargs) self.shutdown_event = threading.Event() - def serve(self): + def serve(self) -> None: server_addr = self.get_localized_module_option( 'server_addr', get_default_addr()) server_port = self.get_localized_module_option( @@ -1340,7 +1341,7 @@ class StandbyModule(MgrStandbyModule): class Root(object): @cherrypy.expose - def index(self): + def index(self) -> str: active_uri = module.get_active_uri() return ''' @@ -1352,7 +1353,7 @@ class StandbyModule(MgrStandbyModule): '''.format(active_uri) @cherrypy.expose - def metrics(self): + def metrics(self) -> str: cherrypy.response.headers['Content-Type'] = 'text/plain' return '' @@ -1366,7 +1367,7 @@ class StandbyModule(MgrStandbyModule): cherrypy.engine.stop() self.log.info('Engine stopped.') - def shutdown(self): + def shutdown(self) -> None: self.log.info("Stopping engine...") self.shutdown_event.set() self.log.info("Stopped engine")