From 964dd9381976c4de1161c48de3f4b0b57e2df466 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sat, 30 Jan 2021 20:40:54 +0800 Subject: [PATCH] mgr/telemetry: add typing annotations Signed-off-by: Kefu Chai --- src/mypy.ini | 3 + src/pybind/mgr/telemetry/module.py | 213 +++++++++++++++++------------ src/pybind/mgr/tox.ini | 1 + 3 files changed, 131 insertions(+), 86 deletions(-) diff --git a/src/mypy.ini b/src/mypy.ini index afdb6d528e5be..c41a2e3096b20 100755 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -52,6 +52,9 @@ disallow_untyped_defs = True [mypy-status.*] disallow_untyped_defs = True +[mypy-telemetry.*] +disallow_untyped_defs = True + [mypy-zabbix.*] disallow_untyped_defs = True diff --git a/src/pybind/mgr/telemetry/module.py b/src/pybind/mgr/telemetry/module.py index 2d4289b3bb241..3178a6b9f10f0 100644 --- a/src/pybind/mgr/telemetry/module.py +++ b/src/pybind/mgr/telemetry/module.py @@ -4,6 +4,7 @@ Telemetry module for ceph-mgr Collect statistics from Ceph cluster and send this back to the Ceph project when user has opted-in """ +import enum import errno import hashlib import json @@ -15,8 +16,9 @@ import time from datetime import datetime, timedelta from threading import Event from collections import defaultdict +from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING -from mgr_module import MgrModule, Option +from mgr_module import MgrModule, Option, OptionValue, Union ALL_CHANNELS = ['basic', 'ident', 'crash', 'device'] @@ -60,8 +62,6 @@ REVISION = 3 # - crush map stats class Module(MgrModule): - config = dict() - metadata_keys = [ "arch", "ceph_version", @@ -166,19 +166,32 @@ class Module(MgrModule): ] @property - def config_keys(self): + def config_keys(self) -> Dict[str, OptionValue]: return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super(Module, self).__init__(*args, **kwargs) self.event = Event() self.run = False - self.last_upload = None - self.last_report = dict() - self.report_id = None - self.salt = None - - def config_notify(self): + self.last_upload: Optional[int] = None + self.last_report: Dict[str, Any] = dict() + self.report_id: Optional[str] = None + self.salt: Optional[str] = None + # for mypy which does not run the code + if TYPE_CHECKING: + self.url = '' + self.device_url = '' + self.enabled = False + self.last_opt_revision = 0 + self.leaderboard = '' + self.interval = 0 + self.proxy = '' + self.channel_basic = True + self.channel_indent = False + self.channel_crash = True + self.channel_device = True + + def config_notify(self) -> None: for opt in self.MODULE_OPTIONS: setattr(self, opt['name'], @@ -187,35 +200,42 @@ class Module(MgrModule): # wake up serve() thread self.event.set() - def load(self): - self.last_upload = self.get_store('last_upload', None) - if self.last_upload is not None: - self.last_upload = int(self.last_upload) + def load(self) -> None: + last_upload = self.get_store('last_upload', None) + if last_upload is None: + self.last_upload = None + else: + self.last_upload = int(last_upload) - self.report_id = self.get_store('report_id', None) - if self.report_id is None: + report_id = self.get_store('report_id', None) + if report_id is None: self.report_id = str(uuid.uuid4()) self.set_store('report_id', self.report_id) + else: + self.report_id = report_id - self.salt = self.get_store('salt', None) - if not self.salt: + salt = self.get_store('salt', None) + if salt is None: self.salt = str(uuid.uuid4()) self.set_store('salt', self.salt) + else: + self.salt = salt - def gather_osd_metadata(self, osd_map): + def gather_osd_metadata(self, + osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]: keys = ["osd_objectstore", "rotational"] keys += self.metadata_keys - metadata = dict() + metadata: Dict[str, Dict[str, int]] = dict() for key in keys: metadata[key] = defaultdict(int) for osd in osd_map['osds']: - res = self.get_metadata('osd', str(osd['osd'])).items() + res = self.get_metadata('osd', str(osd['osd'])) if res is None: self.log.debug('Could not get metadata for osd.%s' % str(osd['osd'])) continue - for k, v in res: + for k, v in res.items(): if k not in keys: continue @@ -223,20 +243,21 @@ class Module(MgrModule): return metadata - def gather_mon_metadata(self, mon_map): + def gather_mon_metadata(self, + mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]: keys = list() keys += self.metadata_keys - metadata = dict() + metadata: Dict[str, Dict[str, int]] = dict() for key in keys: metadata[key] = defaultdict(int) for mon in mon_map['mons']: - res = self.get_metadata('mon', mon['name']).items() + res = self.get_metadata('mon', mon['name']) if res is None: self.log.debug('Could not get metadata for mon.%s' % (mon['name'])) continue - for k, v in res: + for k, v in res.items(): if k not in keys: continue @@ -244,24 +265,30 @@ class Module(MgrModule): return metadata - def gather_crush_info(self): + def gather_crush_info(self) -> Dict[str, Union[int, + bool, + List[int], + Dict[str, int], + Dict[int, int]]]: osdmap = self.get_osdmap() crush_raw = osdmap.get_crush() crush = crush_raw.dump() - def inc(d, k): + BucketKeyT = TypeVar('BucketKeyT', int, str) + + def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None: if k in d: d[k] += 1 else: d[k] = 1 - device_classes = {} + device_classes: Dict[str, int] = {} for dev in crush['devices']: inc(device_classes, dev.get('class', '')) - bucket_algs = {} - bucket_types = {} - bucket_sizes = {} + bucket_algs: Dict[str, int] = {} + bucket_types: Dict[str, int] = {} + bucket_sizes: Dict[int, int] = {} for bucket in crush['buckets']: if '~' in bucket['name']: # ignore shadow buckets continue @@ -283,7 +310,7 @@ class Module(MgrModule): 'bucket_types': bucket_types, } - def gather_configs(self): + def gather_configs(self) -> Dict[str, List[str]]: # cluster config options cluster = set() r, outb, outs = self.mon_command({ @@ -310,11 +337,11 @@ class Module(MgrModule): 'active_changed': sorted(list(active)), } - def gather_crashinfo(self): - crashlist = list() + def gather_crashinfo(self) -> List[Dict[str, str]]: + crashlist: List[Dict[str, str]] = list() errno, crashids, err = self.remote('crash', 'ls') if errno: - return '' + return crashlist for crashid in crashids.split(): cmd = {'id': crashid} errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '') @@ -325,6 +352,7 @@ class Module(MgrModule): # entity_name might have more than one '.', beware (etype, eid) = c.get('entity_name', '').split('.', 1) m = hashlib.sha1() + assert self.salt m.update(self.salt.encode('utf-8')) m.update(eid.encode('utf-8')) m.update(self.salt.encode('utf-8')) @@ -332,7 +360,7 @@ class Module(MgrModule): crashlist.append(c) return crashlist - def get_active_channels(self): + def get_active_channels(self) -> List[str]: r = [] if self.channel_basic: r.append('basic') @@ -342,17 +370,18 @@ class Module(MgrModule): r.append('device') return r - def gather_device_report(self): + def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]: try: time_format = self.remote('devicehealth', 'get_time_format') except: - return None + return {} cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2) min_sample = cutoff.strftime(time_format) devices = self.get('devices')['devices'] - res = {} # anon-host-id -> anon-devid -> { timestamp -> record } + # anon-host-id -> anon-devid -> { timestamp -> record } + res: Dict[str, Dict[str, Dict[str, str]]] = {} for d in devices: devid = d['devid'] try: @@ -400,7 +429,7 @@ class Module(MgrModule): res[anon_host][anon_devid] = m return res - def get_latest(self, daemon_type, daemon_name, stat): + def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int: data = self.get_counter(daemon_type, daemon_name, stat)[stat] #self.log.error("get_latest {0} data={1}".format(stat, data)) if data: @@ -408,7 +437,7 @@ class Module(MgrModule): else: return 0 - def compile_report(self, channels=[]): + def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]: if not channels: channels = self.get_active_channels() report = { @@ -464,11 +493,10 @@ class Module(MgrModule): report['config'] = self.gather_configs() # pools - report['rbd'] = { - 'num_pools': 0, - 'num_images_by_pool': [], - 'mirroring_by_pool': [], - } + + rbd_num_pools = 0 + rbd_num_images_by_pool = [] + rbd_mirroring_by_pool = [] num_pg = 0 report['pools'] = list() for pool in osd_map['pools']: @@ -482,7 +510,7 @@ class Module(MgrModule): if k in ['k', 'm', 'plugin', 'technique', 'crush-failure-domain', 'l'] } - report['pools'].append( + cast(List[Dict[str, Any]], report['pools']).append( { 'pool': pool['pool'], 'type': pool['type'], @@ -499,12 +527,16 @@ class Module(MgrModule): } ) if 'rbd' in pool['application_metadata']: - report['rbd']['num_pools'] += 1 + rbd_num_pools += 1 ioctx = self.rados.open_ioctx(pool['pool_name']) - report['rbd']['num_images_by_pool'].append( + rbd_num_images_by_pool.append( sum(1 for _ in rbd.RBD().list2(ioctx))) - report['rbd']['mirroring_by_pool'].append( + rbd_mirroring_by_pool.append( rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED) + report['rbd'] = { + 'num_pools': rbd_num_pools, + 'num_images_by_pool': rbd_num_images_by_pool, + 'mirroring_by_pool': rbd_mirroring_by_pool} # osds cluster_network = False @@ -560,7 +592,7 @@ class Module(MgrModule): 'mds.root_rbytes') rsnaps = self.get_latest('mds', mds['name'], 'mds.root_rsnaps') - report['fs']['filesystems'].append({ + report['fs']['filesystems'].append({ # type: ignore 'max_mds': fs['max_mds'], 'ever_allowed_features': fs['ever_allowed_features'], 'explicitly_allowed_features': fs['explicitly_allowed_features'], @@ -584,24 +616,24 @@ class Module(MgrModule): 'snaps': rsnaps, }) num_mds += len(fs['info']) - report['fs']['total_num_mds'] = num_mds + report['fs']['total_num_mds'] = num_mds # type: ignore # daemons - report['metadata'] = dict() - report['metadata']['osd'] = self.gather_osd_metadata(osd_map) - report['metadata']['mon'] = self.gather_mon_metadata(mon_map) + report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map), + mon=self.gather_mon_metadata(mon_map)) # host counts servers = self.list_servers() self.log.debug('servers %s' % servers) - report['hosts'] = { + hosts = { 'num': len([h for h in servers if h['hostname']]), } for t in ['mon', 'mds', 'osd', 'mgr']: - report['hosts']['num_with_' + t] = len( + hosts['num_with_' + t] = len( [h for h in servers if len([s for s in h['services'] if s['type'] == t])] ) + report['hosts'] = hosts report['usage'] = { 'pools': len(df['pools']), @@ -611,24 +643,21 @@ class Module(MgrModule): 'total_avail_bytes': df['stats']['total_avail_bytes'] } - report['services'] = defaultdict(int) + services: DefaultDict[str, int] = defaultdict(int) for key, value in service_map['services'].items(): - report['services'][key] += 1 + services[key] += 1 if key == 'rgw': - report['rgw'] = { - 'count': 0, - } + rgw = {} zones = set() - realms = set() zonegroups = set() frontends = set() + count = 0 d = value.get('daemons', dict()) - - for k,v in d.items(): + for k, v in d.items(): if k == 'summary' and v: - report['rgw'][k] = v + rgw[k] = v elif isinstance(v, dict) and 'metadata' in v: - report['rgw']['count'] += 1 + count += 1 zones.add(v['metadata']['zone_id']) zonegroups.add(v['metadata']['zonegroup_id']) frontends.add(v['metadata']['frontend_type#0']) @@ -641,9 +670,12 @@ class Module(MgrModule): if f2: frontends.add(f2) - report['rgw']['zones'] = len(zones) - report['rgw']['zonegroups'] = len(zonegroups) - report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable + rgw['count'] = count + rgw['zones'] = len(zones) + rgw['zonegroups'] = len(zonegroups) + rgw['frontends'] = list(frontends) # sets aren't json-serializable + report['rgw'] = rgw + report['services'] = services try: report['balancer'] = self.remote('balancer', 'gather_telemetry') @@ -660,7 +692,7 @@ class Module(MgrModule): return report - def _try_post(self, what, url, report): + def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]: self.log.info('Sending %s to: %s' % (what, url)) proxies = dict() if self.proxy: @@ -676,14 +708,20 @@ class Module(MgrModule): return fail_reason return None - def send(self, report, endpoint=None): + class EndPoint(enum.Enum): + ceph = 'ceph' + device = 'device' + + def send(self, + report: Dict[str, Dict[str, str]], + endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]: if not endpoint: - endpoint = ['ceph', 'device'] + endpoint = [self.EndPoint.ceph, self.EndPoint.device] failed = [] success = [] self.log.debug('Send endpoints %s' % endpoint) for e in endpoint: - if e == 'ceph': + if e == self.EndPoint.ceph: fail_reason = self._try_post('ceph report', self.url, report) if fail_reason: failed.append(fail_reason) @@ -693,9 +731,10 @@ class Module(MgrModule): self.set_store('last_upload', str(now)) success.append('Ceph report sent to {0}'.format(self.url)) self.log.info('Sent report to {0}'.format(self.url)) - elif e == 'device': + elif e == self.EndPoint.device: if 'device' in self.get_active_channels(): devices = self.gather_device_report() + assert devices num_devs = 0 num_hosts = 0 for host, ls in devices.items(): @@ -736,7 +775,7 @@ class Module(MgrModule): self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement') return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'." self.last_report = self.compile_report() - return self.send(self.last_report, command.get('endpoint')) + return self.send(self.last_report, endpoint) elif command['prefix'] == 'telemetry show': report = self.get_report(channels=command.get('channels', None)) @@ -752,15 +791,17 @@ class Module(MgrModule): return (-errno.EINVAL, '', "Command not found '{0}'".format(command['prefix'])) - def on(self): + def on(self) -> None: self.set_module_option('enabled', True) self.set_module_option('last_opt_revision', REVISION) - def off(self): + def off(self) -> None: self.set_module_option('enabled', False) self.set_module_option('last_opt_revision', 1) - def get_report(self, report_type='default', channels=None): + def get_report(self, + report_type: str = 'default', + channels: Optional[List[str]] = None) -> Dict[str, Any]: if report_type == 'default': return self.compile_report(channels=channels) elif report_type == 'device': @@ -770,7 +811,7 @@ class Module(MgrModule): 'device_report': self.gather_device_report()} return {} - def self_test(self): + def self_test(self) -> None: report = self.compile_report() if len(report) == 0: raise RuntimeError('Report is empty') @@ -778,11 +819,11 @@ class Module(MgrModule): if 'report_id' not in report: raise RuntimeError('report_id not found in report') - def shutdown(self): + def shutdown(self) -> None: self.run = False self.event.set() - def refresh_health_checks(self): + def refresh_health_checks(self) -> None: health_checks = {} if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN: health_checks['TELEMETRY_CHANGED'] = { @@ -794,7 +835,7 @@ class Module(MgrModule): } self.set_health_checks(health_checks) - def serve(self): + def serve(self) -> None: self.load() self.config_notify() self.run = True @@ -840,5 +881,5 @@ class Module(MgrModule): return True @staticmethod - def can_run(): + def can_run() -> Tuple[bool, str]: return True, '' diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index b0916a2299e15..de5745a6cfad1 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -72,6 +72,7 @@ commands = -m snap_schedule \ -m stats \ -m status \ + -m telemetry \ -m test_orchestrator \ -m volumes \ -m zabbix -- 2.39.5