from rbd import RBD
from collections import namedtuple
try:
- from typing import DefaultDict, Optional, Dict, Any, List, Set, cast
+ from typing import DefaultDict, Optional, Dict, Any, List, Set, Tuple, Union, cast
except ImportError:
pass
# cherrypy likes to sys.exit on error. don't let it take us down too!
-def os_exit_noop(*args, **kwargs):
+def os_exit_noop(status: int) -> None:
pass
-
-os._exit = os_exit_noop
+os._exit = os_exit_noop # type: ignore
# to access things in class Module from subclass Root. Because
# it's a dict, the writer doesn't need to declare 'global' for access
})
-def health_status_to_number(status):
+def health_status_to_number(status: str) -> int:
if status == 'HEALTH_OK':
return 0
elif status == 'HEALTH_WARN':
class Metric(object):
- def __init__(self, mtype, name, desc, labels=None):
+ def __init__(self, mtype: str, name: str, desc: str, labels: Optional[Tuple[str, ...]] = None) -> None:
self.mtype = mtype
self.name = name
self.desc = desc
self.labelnames = labels # tuple if present
- self.value = {} # indexed by label values
+ self.value: Dict[Tuple[str, ...], Union[float, int]] = {} # indexed by label values
- def clear(self):
+ def clear(self) -> None:
self.value = {}
- def set(self, value, labelvalues=None):
+ def set(self, value: Union[float, int], labelvalues: Optional[Tuple[str, ...]] = None) -> None:
# labelvalues must be a tuple
labelvalues = labelvalues or ('',)
self.value[labelvalues] = value
- def str_expfmt(self):
+ def str_expfmt(self) -> str:
- def promethize(path):
+ def promethize(path: str) -> str:
''' replace illegal metric name characters '''
result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
return "ceph_{0}".format(result)
- def floatstr(value):
+ def floatstr(value: float) -> str:
''' represent as Go-compatible float '''
if value == float('inf'):
return '+Inf'
class MetricCollectionThread(threading.Thread):
- def __init__(self, module):
- # type: (Module) -> None
+ def __init__(self, module: 'Module') -> None:
self.mod = module
self.active = True
self.event = threading.Event()
super(MetricCollectionThread, self).__init__(target=self.collect)
- def collect(self):
+ def collect(self) -> None:
self.mod.log.info('starting metric collection thread')
while self.active:
self.mod.log.debug('collecting cache in thread')
self.mod.log.error('No MON connection')
self.event.wait(self.mod.scrape_interval)
- def stop(self):
+ def stop(self) -> None:
self.active = False
self.event.set()
STALE_CACHE_FAIL = 'fail'
STALE_CACHE_RETURN = 'return'
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.metrics = self._setup_static_metrics()
self.shutdown_event = threading.Event()
self.collect_time = 0.0
self.scrape_interval: float = 15.0
self.stale_cache_strategy: str = self.STALE_CACHE_FAIL
- self.collect_cache = None
+ self.collect_cache: Optional[str] = None
self.rbd_stats = {
'pools': {},
'pools_refresh_time': 0,
_global_instance = self
self.metrics_thread = MetricCollectionThread(_global_instance)
- def _setup_static_metrics(self):
+ def _setup_static_metrics(self) -> Dict[str, Metric]:
metrics = {}
metrics['health_status'] = Metric(
'untyped',
return metrics
@profile_method()
- def get_health(self):
+ def get_health(self) -> None:
- def _get_value(message, delim=' ', word_pos=0):
+ def _get_value(message: str, delim: str = ' ', word_pos: int = 0) -> Tuple[int, int]:
"""Extract value from message (default is 1st field)"""
v_str = message.split(delim)[word_pos]
if v_str.isdigit():
self.metrics[path].set(0)
@profile_method()
- def get_pool_stats(self):
+ def get_pool_stats(self) -> None:
# retrieve pool stats to provide per pool recovery metrics
# (osd_pool_stats moved to mgr in Mimic)
pstats = self.get('osd_pool_stats')
)
@profile_method()
- def get_df(self):
+ def get_df(self) -> None:
# maybe get the to-be-exported metrics from a config?
df = self.get('df')
for stat in DF_CLUSTER:
)
@profile_method()
- def get_fs(self):
+ def get_fs(self) -> None:
fs_map = self.get('fs_map')
servers = self.get_service_list()
self.log.debug('standbys: {}'.format(fs_map['standbys']))
# export standby mds metadata, default standby fs_id is '-1'
for standby in fs_map['standbys']:
id_ = standby['name']
- host_version = servers.get((id_, 'mds'), ('', ''))
+ host, version = servers.get((id_, 'mds'), ('', ''))
+ addr, rank = standby['addr'], standby['rank']
self.metrics['mds_metadata'].set(1, (
'mds.{}'.format(id_), '-1',
- host_version[0], standby['addr'],
- standby['rank'], host_version[1]
+ cast(str, host),
+ cast(str, addr),
+ cast(str, rank),
+ cast(str, version)
))
for fs in fs_map['filesystems']:
# collect fs metadata
self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
for gid, daemon in fs['mdsmap']['info'].items():
id_ = daemon['name']
- host_version = servers.get((id_, 'mds'), ('', ''))
+ host, version = servers.get((id_, 'mds'), ('', ''))
self.metrics['mds_metadata'].set(1, (
'mds.{}'.format(id_), fs['id'],
- host_version[0], daemon['addr'],
- daemon['rank'], host_version[1]
+ host, daemon['addr'],
+ daemon['rank'], version
))
@profile_method()
- def get_quorum_status(self):
+ def get_quorum_status(self) -> None:
mon_status = json.loads(self.get('mon_status')['json'])
servers = self.get_service_list()
for mon in mon_status['monmap']['mons']:
))
@profile_method()
- def get_mgr_status(self):
+ def get_mgr_status(self) -> None:
mgr_map = self.get('mgr_map')
servers = self.get_service_list()
for module in mgr_map['available_modules']}
for mgr in all_mgrs:
- host_version = servers.get((mgr, 'mgr'), ('', ''))
+ host, version = servers.get((mgr, 'mgr'), ('', ''))
if mgr == active:
_state = 1
else:
_state = 0
self.metrics['mgr_metadata'].set(1, (
- 'mgr.{}'.format(mgr), host_version[0],
- host_version[1]
+ f'mgr.{mgr}', host, version
))
self.metrics['mgr_status'].set(_state, (
- 'mgr.{}'.format(mgr),
- ))
+ f'mgr.{mgr}',))
always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
active_modules = list(always_on_modules)
active_modules.extend(mgr_map['modules'])
self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
@profile_method()
- def get_pg_status(self):
+ def get_pg_status(self) -> None:
pg_summary = self.get('pg_summary')
self.log.warning("skipping pg in unknown state {}".format(state))
@profile_method()
- def get_osd_stats(self):
+ def get_osd_stats(self) -> None:
osd_stats = self.get('osd_stats')
for osd in osd_stats['osd_stats']:
id_ = osd['osd']
'osd.{}'.format(id_),
))
- def get_service_list(self):
+ def get_service_list(self) -> Dict[Tuple[str, str], Tuple[str, str]]:
ret = {}
for server in self.list_servers():
- version = server.get('ceph_version', '')
- host = server.get('hostname', '')
+ version = cast(str, server.get('ceph_version', ''))
+ host = cast(str, server.get('hostname', ''))
for service in cast(List[ServiceInfoT], server.get('services', [])):
ret.update({(service['id'], service['type']): (host, version)})
return ret
@profile_method()
- def get_metadata_and_osd_status(self):
+ def get_metadata_and_osd_status(self) -> None:
osd_map = self.get('osd_map')
osd_flags = osd_map['flags'].split(',')
for flag in OSD_FLAGS:
continue
mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
service_id)
+ rbd_mirror_metadata = cast(Tuple[str, ...],
+ (mirror_metadata.get(k, '')
+ for k in RBD_MIRROR_METADATA))
self.metrics['rbd_mirror_metadata'].set(
- 1, (mirror_metadata.get(k, '')
- for k in RBD_MIRROR_METADATA)
+ 1, rbd_mirror_metadata
)
@profile_method()
- def get_num_objects(self):
+ def get_num_objects(self) -> None:
pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
for obj in NUM_OBJECTS:
stat = 'num_objects_{}'.format(obj)
self.metrics[stat].set(pg_sum[stat])
@profile_method()
- def get_rbd_stats(self):
+ def get_rbd_stats(self) -> None:
# Per RBD image stats is collected by registering a dynamic osd perf
# stats query that tells OSDs to group stats for requests associated
# with RBD objects by pool, namespace, and image id, which are
self.metrics[path].set(counters[i][1], labels)
i += 1
- def refresh_rbd_stats_pools(self, pools):
+ def refresh_rbd_stats_pools(self, pools: Dict[str, Set[str]]) -> None:
self.log.debug('refreshing rbd pools %s' % (pools))
rbd = RBD()
self.log.error('failed listing pool %s: %s' % (pool_name, e))
self.rbd_stats['pools_refresh_time'] = time.time()
- def shutdown_rbd_stats(self):
+ def shutdown_rbd_stats(self) -> None:
if 'query_id' in self.rbd_stats:
self.remove_osd_perf_query(self.rbd_stats['query_id'])
del self.rbd_stats['query_id']
del self.rbd_stats['query']
self.rbd_stats['pools'].clear()
- def add_fixed_name_metrics(self):
+ def add_fixed_name_metrics(self) -> None:
"""
Add fixed name metrics from existing ones that have details in their names
that should be in labels (not in name).
See: https://tracker.ceph.com/issues/45311
"""
new_metrics = {}
- for metric_path in self.metrics.keys():
+ for metric_path, metrics in self.metrics.items():
# Address RGW sync perf. counters.
match = re.search(r'^data-sync-from-(.*)\.', metric_path)
if match:
new_path = re.sub('from-([^.]*)', 'from-zone', metric_path)
if new_path not in new_metrics:
new_metrics[new_path] = Metric(
- self.metrics[metric_path].mtype,
+ metrics.mtype,
new_path,
- self.metrics[metric_path].desc,
- self.metrics[metric_path].labelnames + ('source_zone',)
+ metrics.desc,
+ cast(Tuple[str, ...], metrics.labelnames) + ('source_zone',)
)
- for label_values, value in self.metrics[metric_path].value.items():
+ for label_values, value in metrics.value.items():
new_metrics[new_path].set(value, label_values + (match.group(1),))
self.metrics.update(new_metrics)
cast(MetricCounter, count_metric).add(1, (method_name,))
@profile_method(True)
- def collect(self):
+ def collect(self) -> str:
# Clear the metrics before scraping
for k in self.metrics.keys():
self.metrics[k].clear()
return ''.join(_metrics) + '\n'
@CLIReadCommand('prometheus file_sd_config')
- def get_file_sd_config(self):
+ def get_file_sd_config(self) -> Tuple[int, str, str]:
'''
Return file_sd compatible prometheus config for mgr cluster
'''
]
return 0, json.dumps(ret), ""
- def self_test(self):
+ def self_test(self) -> None:
self.collect()
self.get_file_sd_config()
- def serve(self):
+ def serve(self) -> None:
class Root(object):
# collapse everything to '/'
- def _cp_dispatch(self, vpath):
+ def _cp_dispatch(self, vpath: str) -> 'Root':
cherrypy.request.path = ''
return self
@cherrypy.expose
- def index(self):
+ def index(self) -> str:
return '''<!DOCTYPE html>
<html>
<head><title>Ceph Exporter</title></head>
</html>'''
@cherrypy.expose
- def metrics(self):
+ def metrics(self) -> Optional[str]:
# Lock the function execution
assert isinstance(_global_instance, Module)
with _global_instance.collect_lock:
return self._metrics(_global_instance)
@staticmethod
- def _metrics(instance):
- # type: (Module) -> Any
+ def _metrics(instance: 'Module') -> Optional[str]:
# Return cached data if available
if not instance.collect_cache:
raise cherrypy.HTTPError(503, 'No cached data available yet')
- def respond():
+ def respond() -> Optional[str]:
assert isinstance(instance, Module)
cherrypy.response.headers['Content-Type'] = 'text/plain'
return instance.collect_cache
)
instance.log.error(msg)
raise cherrypy.HTTPError(503, msg)
+ return None
# Make the cache timeout for collecting configurable
self.scrape_interval = cast(float, self.get_localized_module_option('scrape_interval'))
# wait for the metrics collection thread to stop
self.metrics_thread.join()
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping engine...')
self.shutdown_event.set()
class StandbyModule(MgrStandbyModule):
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(StandbyModule, self).__init__(*args, **kwargs)
self.shutdown_event = threading.Event()
- def serve(self):
+ def serve(self) -> None:
server_addr = self.get_localized_module_option(
'server_addr', get_default_addr())
server_port = self.get_localized_module_option(
class Root(object):
@cherrypy.expose
- def index(self):
+ def index(self) -> str:
active_uri = module.get_active_uri()
return '''<!DOCTYPE html>
<html>
</html>'''.format(active_uri)
@cherrypy.expose
- def metrics(self):
+ def metrics(self) -> str:
cherrypy.response.headers['Content-Type'] = 'text/plain'
return ''
cherrypy.engine.stop()
self.log.info('Engine stopped.')
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info("Stopping engine...")
self.shutdown_event.set()
self.log.info("Stopped engine")