]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/telemetry: add typing annotations
authorKefu Chai <kchai@redhat.com>
Sat, 30 Jan 2021 12:40:54 +0000 (20:40 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 3 Feb 2021 09:17:24 +0000 (17:17 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/mypy.ini
src/pybind/mgr/telemetry/module.py
src/pybind/mgr/tox.ini

index afdb6d528e5be1fb692389b1d3aec3c21ab9fabb..c41a2e3096b2099d6d6cc46419618d06912c009d 100755 (executable)
@@ -52,6 +52,9 @@ disallow_untyped_defs = True
 [mypy-status.*]
 disallow_untyped_defs = True
 
+[mypy-telemetry.*]
+disallow_untyped_defs = True
+
 [mypy-zabbix.*]
 disallow_untyped_defs = True
 
index 2d4289b3bb241326ab9d0e1cc0a027b760bec6f6..3178a6b9f10f0bd30d68532d5dc25b51eeb21ee3 100644 (file)
@@ -4,6 +4,7 @@ Telemetry module for ceph-mgr
 Collect statistics from Ceph cluster and send this back to the Ceph project
 when user has opted-in
 """
+import enum
 import errno
 import hashlib
 import json
@@ -15,8 +16,9 @@ import time
 from datetime import datetime, timedelta
 from threading import Event
 from collections import defaultdict
+from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING
 
-from mgr_module import MgrModule, Option
+from mgr_module import MgrModule, Option, OptionValue, Union
 
 
 ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
@@ -60,8 +62,6 @@ REVISION = 3
 #   - crush map stats
 
 class Module(MgrModule):
-    config = dict()
-
     metadata_keys = [
             "arch",
             "ceph_version",
@@ -166,19 +166,32 @@ class Module(MgrModule):
     ]
 
     @property
-    def config_keys(self):
+    def config_keys(self) -> Dict[str, OptionValue]:
         return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(Module, self).__init__(*args, **kwargs)
         self.event = Event()
         self.run = False
-        self.last_upload = None
-        self.last_report = dict()
-        self.report_id = None
-        self.salt = None
-
-    def config_notify(self):
+        self.last_upload: Optional[int] = None
+        self.last_report: Dict[str, Any] = dict()
+        self.report_id: Optional[str] = None
+        self.salt: Optional[str] = None
+        # for mypy which does not run the code
+        if TYPE_CHECKING:
+            self.url = ''
+            self.device_url = ''
+            self.enabled = False
+            self.last_opt_revision = 0
+            self.leaderboard = ''
+            self.interval = 0
+            self.proxy = ''
+            self.channel_basic = True
+            self.channel_indent = False
+            self.channel_crash = True
+            self.channel_device = True
+
+    def config_notify(self) -> None:
         for opt in self.MODULE_OPTIONS:
             setattr(self,
                     opt['name'],
@@ -187,35 +200,42 @@ class Module(MgrModule):
         # wake up serve() thread
         self.event.set()
 
-    def load(self):
-        self.last_upload = self.get_store('last_upload', None)
-        if self.last_upload is not None:
-            self.last_upload = int(self.last_upload)
+    def load(self) -> None:
+        last_upload = self.get_store('last_upload', None)
+        if last_upload is None:
+            self.last_upload = None
+        else:
+            self.last_upload = int(last_upload)
 
-        self.report_id = self.get_store('report_id', None)
-        if self.report_id is None:
+        report_id = self.get_store('report_id', None)
+        if report_id is None:
             self.report_id = str(uuid.uuid4())
             self.set_store('report_id', self.report_id)
+        else:
+            self.report_id = report_id
 
-        self.salt = self.get_store('salt', None)
-        if not self.salt:
+        salt = self.get_store('salt', None)
+        if salt is None:
             self.salt = str(uuid.uuid4())
             self.set_store('salt', self.salt)
+        else:
+            self.salt = salt
 
-    def gather_osd_metadata(self, osd_map):
+    def gather_osd_metadata(self,
+                            osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
         keys = ["osd_objectstore", "rotational"]
         keys += self.metadata_keys
 
-        metadata = dict()
+        metadata: Dict[str, Dict[str, int]] = dict()
         for key in keys:
             metadata[key] = defaultdict(int)
 
         for osd in osd_map['osds']:
-            res = self.get_metadata('osd', str(osd['osd'])).items()
+            res = self.get_metadata('osd', str(osd['osd']))
             if res is None:
                 self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
                 continue
-            for k, v in res:
+            for k, v in res.items():
                 if k not in keys:
                     continue
 
@@ -223,20 +243,21 @@ class Module(MgrModule):
 
         return metadata
 
-    def gather_mon_metadata(self, mon_map):
+    def gather_mon_metadata(self,
+                            mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
         keys = list()
         keys += self.metadata_keys
 
-        metadata = dict()
+        metadata: Dict[str, Dict[str, int]] = dict()
         for key in keys:
             metadata[key] = defaultdict(int)
 
         for mon in mon_map['mons']:
-            res = self.get_metadata('mon', mon['name']).items()
+            res = self.get_metadata('mon', mon['name'])
             if res is None:
                 self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
                 continue
-            for k, v in res:
+            for k, v in res.items():
                 if k not in keys:
                     continue
 
@@ -244,24 +265,30 @@ class Module(MgrModule):
 
         return metadata
 
-    def gather_crush_info(self):
+    def gather_crush_info(self) -> Dict[str, Union[int,
+                                                   bool,
+                                                   List[int],
+                                                   Dict[str, int],
+                                                   Dict[int, int]]]:
         osdmap = self.get_osdmap()
         crush_raw = osdmap.get_crush()
         crush = crush_raw.dump()
 
-        def inc(d, k):
+        BucketKeyT = TypeVar('BucketKeyT', int, str)
+
+        def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
             if k in d:
                 d[k] += 1
             else:
                 d[k] = 1
 
-        device_classes = {}
+        device_classes: Dict[str, int] = {}
         for dev in crush['devices']:
             inc(device_classes, dev.get('class', ''))
 
-        bucket_algs = {}
-        bucket_types = {}
-        bucket_sizes = {}
+        bucket_algs: Dict[str, int] = {}
+        bucket_types: Dict[str, int] = {}
+        bucket_sizes: Dict[int, int] = {}
         for bucket in crush['buckets']:
             if '~' in bucket['name']:  # ignore shadow buckets
                 continue
@@ -283,7 +310,7 @@ class Module(MgrModule):
             'bucket_types': bucket_types,
         }
 
-    def gather_configs(self):
+    def gather_configs(self) -> Dict[str, List[str]]:
         # cluster config options
         cluster = set()
         r, outb, outs = self.mon_command({
@@ -310,11 +337,11 @@ class Module(MgrModule):
             'active_changed': sorted(list(active)),
         }
 
-    def gather_crashinfo(self):
-        crashlist = list()
+    def gather_crashinfo(self) -> List[Dict[str, str]]:
+        crashlist: List[Dict[str, str]] = list()
         errno, crashids, err = self.remote('crash', 'ls')
         if errno:
-            return ''
+            return crashlist
         for crashid in crashids.split():
             cmd = {'id': crashid}
             errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
@@ -325,6 +352,7 @@ class Module(MgrModule):
             # entity_name might have more than one '.', beware
             (etype, eid) = c.get('entity_name', '').split('.', 1)
             m = hashlib.sha1()
+            assert self.salt
             m.update(self.salt.encode('utf-8'))
             m.update(eid.encode('utf-8'))
             m.update(self.salt.encode('utf-8'))
@@ -332,7 +360,7 @@ class Module(MgrModule):
             crashlist.append(c)
         return crashlist
 
-    def get_active_channels(self):
+    def get_active_channels(self) -> List[str]:
         r = []
         if self.channel_basic:
             r.append('basic')
@@ -342,17 +370,18 @@ class Module(MgrModule):
             r.append('device')
         return r
 
-    def gather_device_report(self):
+    def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
         try:
             time_format = self.remote('devicehealth', 'get_time_format')
         except:
-            return None
+            return {}
         cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
         min_sample = cutoff.strftime(time_format)
 
         devices = self.get('devices')['devices']
 
-        res = {}  # anon-host-id -> anon-devid -> { timestamp -> record }
+        # anon-host-id -> anon-devid -> { timestamp -> record }
+        res: Dict[str, Dict[str, Dict[str, str]]] = {}
         for d in devices:
             devid = d['devid']
             try:
@@ -400,7 +429,7 @@ class Module(MgrModule):
             res[anon_host][anon_devid] = m
         return res
 
-    def get_latest(self, daemon_type, daemon_name, stat):
+    def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
         data = self.get_counter(daemon_type, daemon_name, stat)[stat]
         #self.log.error("get_latest {0} data={1}".format(stat, data))
         if data:
@@ -408,7 +437,7 @@ class Module(MgrModule):
         else:
             return 0
 
-    def compile_report(self, channels=[]):
+    def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
         if not channels:
             channels = self.get_active_channels()
         report = {
@@ -464,11 +493,10 @@ class Module(MgrModule):
             report['config'] = self.gather_configs()
 
             # pools
-            report['rbd'] = {
-                'num_pools': 0,
-                'num_images_by_pool': [],
-                'mirroring_by_pool': [],
-            }
+
+            rbd_num_pools = 0
+            rbd_num_images_by_pool = []
+            rbd_mirroring_by_pool = []
             num_pg = 0
             report['pools'] = list()
             for pool in osd_map['pools']:
@@ -482,7 +510,7 @@ class Module(MgrModule):
                         if k in ['k', 'm', 'plugin', 'technique',
                                  'crush-failure-domain', 'l']
                     }
-                report['pools'].append(
+                cast(List[Dict[str, Any]], report['pools']).append(
                     {
                         'pool': pool['pool'],
                         'type': pool['type'],
@@ -499,12 +527,16 @@ class Module(MgrModule):
                     }
                 )
                 if 'rbd' in pool['application_metadata']:
-                    report['rbd']['num_pools'] += 1
+                    rbd_num_pools += 1
                     ioctx = self.rados.open_ioctx(pool['pool_name'])
-                    report['rbd']['num_images_by_pool'].append(
+                    rbd_num_images_by_pool.append(
                         sum(1 for _ in rbd.RBD().list2(ioctx)))
-                    report['rbd']['mirroring_by_pool'].append(
+                    rbd_mirroring_by_pool.append(
                         rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
+            report['rbd'] = {
+                'num_pools': rbd_num_pools,
+                'num_images_by_pool': rbd_num_images_by_pool,
+                'mirroring_by_pool': rbd_mirroring_by_pool}
 
             # osds
             cluster_network = False
@@ -560,7 +592,7 @@ class Module(MgrModule):
                                                  'mds.root_rbytes')
                         rsnaps = self.get_latest('mds', mds['name'],
                                                  'mds.root_rsnaps')
-                report['fs']['filesystems'].append({
+                report['fs']['filesystems'].append({  # type: ignore
                     'max_mds': fs['max_mds'],
                     'ever_allowed_features': fs['ever_allowed_features'],
                     'explicitly_allowed_features': fs['explicitly_allowed_features'],
@@ -584,24 +616,24 @@ class Module(MgrModule):
                     'snaps': rsnaps,
                 })
                 num_mds += len(fs['info'])
-            report['fs']['total_num_mds'] = num_mds
+            report['fs']['total_num_mds'] = num_mds  # type: ignore
 
             # daemons
-            report['metadata'] = dict()
-            report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
-            report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
+            report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
+                                      mon=self.gather_mon_metadata(mon_map))
 
             # host counts
             servers = self.list_servers()
             self.log.debug('servers %s' % servers)
-            report['hosts'] = {
+            hosts = {
                 'num': len([h for h in servers if h['hostname']]),
             }
             for t in ['mon', 'mds', 'osd', 'mgr']:
-                report['hosts']['num_with_' + t] = len(
+                hosts['num_with_' + t] = len(
                     [h for h in servers
                      if len([s for s in h['services'] if s['type'] == t])]
                 )
+            report['hosts'] = hosts
 
             report['usage'] = {
                 'pools': len(df['pools']),
@@ -611,24 +643,21 @@ class Module(MgrModule):
                 'total_avail_bytes': df['stats']['total_avail_bytes']
             }
 
-            report['services'] = defaultdict(int)
+            services: DefaultDict[str, int] = defaultdict(int)
             for key, value in service_map['services'].items():
-                report['services'][key] += 1
+                services[key] += 1
                 if key == 'rgw':
-                    report['rgw'] = {
-                        'count': 0,
-                    }
+                    rgw = {}
                     zones = set()
-                    realms = set()
                     zonegroups = set()
                     frontends = set()
+                    count = 0
                     d = value.get('daemons', dict())
-
-                    for k,v in d.items():
+                    for k, v in d.items():
                         if k == 'summary' and v:
-                            report['rgw'][k] = v
+                            rgw[k] = v
                         elif isinstance(v, dict) and 'metadata' in v:
-                            report['rgw']['count'] += 1
+                            count += 1
                             zones.add(v['metadata']['zone_id'])
                             zonegroups.add(v['metadata']['zonegroup_id'])
                             frontends.add(v['metadata']['frontend_type#0'])
@@ -641,9 +670,12 @@ class Module(MgrModule):
                             if f2:
                                 frontends.add(f2)
 
-                    report['rgw']['zones'] = len(zones)
-                    report['rgw']['zonegroups'] = len(zonegroups)
-                    report['rgw']['frontends'] = list(frontends)  # sets aren't json-serializable
+                    rgw['count'] = count
+                    rgw['zones'] = len(zones)
+                    rgw['zonegroups'] = len(zonegroups)
+                    rgw['frontends'] = list(frontends)  # sets aren't json-serializable
+                    report['rgw'] = rgw
+            report['services'] = services
 
             try:
                 report['balancer'] = self.remote('balancer', 'gather_telemetry')
@@ -660,7 +692,7 @@ class Module(MgrModule):
 
         return report
 
-    def _try_post(self, what, url, report):
+    def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
         self.log.info('Sending %s to: %s' % (what, url))
         proxies = dict()
         if self.proxy:
@@ -676,14 +708,20 @@ class Module(MgrModule):
             return fail_reason
         return None
 
-    def send(self, report, endpoint=None):
+    class EndPoint(enum.Enum):
+        ceph = 'ceph'
+        device = 'device'
+
+    def send(self,
+             report: Dict[str, Dict[str, str]],
+             endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
         if not endpoint:
-            endpoint = ['ceph', 'device']
+            endpoint = [self.EndPoint.ceph, self.EndPoint.device]
         failed = []
         success = []
         self.log.debug('Send endpoints %s' % endpoint)
         for e in endpoint:
-            if e == 'ceph':
+            if e == self.EndPoint.ceph:
                 fail_reason = self._try_post('ceph report', self.url, report)
                 if fail_reason:
                     failed.append(fail_reason)
@@ -693,9 +731,10 @@ class Module(MgrModule):
                     self.set_store('last_upload', str(now))
                     success.append('Ceph report sent to {0}'.format(self.url))
                     self.log.info('Sent report to {0}'.format(self.url))
-            elif e == 'device':
+            elif e == self.EndPoint.device:
                 if 'device' in self.get_active_channels():
                     devices = self.gather_device_report()
+                    assert devices
                     num_devs = 0
                     num_hosts = 0
                     for host, ls in devices.items():
@@ -736,7 +775,7 @@ class Module(MgrModule):
                 self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
                 return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
             self.last_report = self.compile_report()
-            return self.send(self.last_report, command.get('endpoint'))
+            return self.send(self.last_report, endpoint)
 
         elif command['prefix'] == 'telemetry show':
             report = self.get_report(channels=command.get('channels', None))
@@ -752,15 +791,17 @@ class Module(MgrModule):
             return (-errno.EINVAL, '',
                     "Command not found '{0}'".format(command['prefix']))
 
-    def on(self):
+    def on(self) -> None:
         self.set_module_option('enabled', True)
         self.set_module_option('last_opt_revision', REVISION)
 
-    def off(self):
+    def off(self) -> None:
         self.set_module_option('enabled', False)
         self.set_module_option('last_opt_revision', 1)
 
-    def get_report(self, report_type='default', channels=None):
+    def get_report(self,
+                   report_type: str = 'default',
+                   channels: Optional[List[str]] = None) -> Dict[str, Any]:
         if report_type == 'default':
             return self.compile_report(channels=channels)
         elif report_type == 'device':
@@ -770,7 +811,7 @@ class Module(MgrModule):
                     'device_report': self.gather_device_report()}
         return {}
 
-    def self_test(self):
+    def self_test(self) -> None:
         report = self.compile_report()
         if len(report) == 0:
             raise RuntimeError('Report is empty')
@@ -778,11 +819,11 @@ class Module(MgrModule):
         if 'report_id' not in report:
             raise RuntimeError('report_id not found in report')
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self.run = False
         self.event.set()
 
-    def refresh_health_checks(self):
+    def refresh_health_checks(self) -> None:
         health_checks = {}
         if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
             health_checks['TELEMETRY_CHANGED'] = {
@@ -794,7 +835,7 @@ class Module(MgrModule):
             }
         self.set_health_checks(health_checks)
 
-    def serve(self):
+    def serve(self) -> None:
         self.load()
         self.config_notify()
         self.run = True
@@ -840,5 +881,5 @@ class Module(MgrModule):
         return True
 
     @staticmethod
-    def can_run():
+    def can_run() -> Tuple[bool, str]:
         return True, ''
index b0916a2299e15592ff921d9e63ba35dd2a1887c8..de5745a6cfad1f62d07a7f744a346c827bc7d638 100644 (file)
@@ -72,6 +72,7 @@ commands =
            -m snap_schedule \
            -m stats \
            -m status \
+           -m telemetry \
            -m test_orchestrator \
            -m volumes \
            -m zabbix