Collect statistics from Ceph cluster and send this back to the Ceph project
when user has opted-in
"""
+import enum
import errno
import hashlib
import json
from datetime import datetime, timedelta
from threading import Event
from collections import defaultdict
+from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING
-from mgr_module import MgrModule, Option
+from mgr_module import MgrModule, Option, OptionValue, Union
ALL_CHANNELS = ['basic', 'ident', 'crash', 'device']
# - crush map stats
class Module(MgrModule):
- config = dict()
-
metadata_keys = [
"arch",
"ceph_version",
]
@property
- def config_keys(self):
+ def config_keys(self) -> Dict[str, OptionValue]:
return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
self.run = False
- self.last_upload = None
- self.last_report = dict()
- self.report_id = None
- self.salt = None
-
- def config_notify(self):
+ self.last_upload: Optional[int] = None
+ self.last_report: Dict[str, Any] = dict()
+ self.report_id: Optional[str] = None
+ self.salt: Optional[str] = None
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.url = ''
+ self.device_url = ''
+ self.enabled = False
+ self.last_opt_revision = 0
+ self.leaderboard = ''
+ self.interval = 0
+ self.proxy = ''
+ self.channel_basic = True
+ self.channel_indent = False
+ self.channel_crash = True
+ self.channel_device = True
+
+ def config_notify(self) -> None:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
# wake up serve() thread
self.event.set()
- def load(self):
- self.last_upload = self.get_store('last_upload', None)
- if self.last_upload is not None:
- self.last_upload = int(self.last_upload)
+ def load(self) -> None:
+ last_upload = self.get_store('last_upload', None)
+ if last_upload is None:
+ self.last_upload = None
+ else:
+ self.last_upload = int(last_upload)
- self.report_id = self.get_store('report_id', None)
- if self.report_id is None:
+ report_id = self.get_store('report_id', None)
+ if report_id is None:
self.report_id = str(uuid.uuid4())
self.set_store('report_id', self.report_id)
+ else:
+ self.report_id = report_id
- self.salt = self.get_store('salt', None)
- if not self.salt:
+ salt = self.get_store('salt', None)
+ if salt is None:
self.salt = str(uuid.uuid4())
self.set_store('salt', self.salt)
+ else:
+ self.salt = salt
- def gather_osd_metadata(self, osd_map):
+ def gather_osd_metadata(self,
+ osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
keys = ["osd_objectstore", "rotational"]
keys += self.metadata_keys
- metadata = dict()
+ metadata: Dict[str, Dict[str, int]] = dict()
for key in keys:
metadata[key] = defaultdict(int)
for osd in osd_map['osds']:
- res = self.get_metadata('osd', str(osd['osd'])).items()
+ res = self.get_metadata('osd', str(osd['osd']))
if res is None:
self.log.debug('Could not get metadata for osd.%s' % str(osd['osd']))
continue
- for k, v in res:
+ for k, v in res.items():
if k not in keys:
continue
return metadata
- def gather_mon_metadata(self, mon_map):
+ def gather_mon_metadata(self,
+ mon_map: Dict[str, List[Dict[str, str]]]) -> Dict[str, Dict[str, int]]:
keys = list()
keys += self.metadata_keys
- metadata = dict()
+ metadata: Dict[str, Dict[str, int]] = dict()
for key in keys:
metadata[key] = defaultdict(int)
for mon in mon_map['mons']:
- res = self.get_metadata('mon', mon['name']).items()
+ res = self.get_metadata('mon', mon['name'])
if res is None:
self.log.debug('Could not get metadata for mon.%s' % (mon['name']))
continue
- for k, v in res:
+ for k, v in res.items():
if k not in keys:
continue
return metadata
- def gather_crush_info(self):
+ def gather_crush_info(self) -> Dict[str, Union[int,
+ bool,
+ List[int],
+ Dict[str, int],
+ Dict[int, int]]]:
osdmap = self.get_osdmap()
crush_raw = osdmap.get_crush()
crush = crush_raw.dump()
- def inc(d, k):
+ BucketKeyT = TypeVar('BucketKeyT', int, str)
+
+ def inc(d: Dict[BucketKeyT, int], k: BucketKeyT) -> None:
if k in d:
d[k] += 1
else:
d[k] = 1
- device_classes = {}
+ device_classes: Dict[str, int] = {}
for dev in crush['devices']:
inc(device_classes, dev.get('class', ''))
- bucket_algs = {}
- bucket_types = {}
- bucket_sizes = {}
+ bucket_algs: Dict[str, int] = {}
+ bucket_types: Dict[str, int] = {}
+ bucket_sizes: Dict[int, int] = {}
for bucket in crush['buckets']:
if '~' in bucket['name']: # ignore shadow buckets
continue
'bucket_types': bucket_types,
}
- def gather_configs(self):
+ def gather_configs(self) -> Dict[str, List[str]]:
# cluster config options
cluster = set()
r, outb, outs = self.mon_command({
'active_changed': sorted(list(active)),
}
- def gather_crashinfo(self):
- crashlist = list()
+ def gather_crashinfo(self) -> List[Dict[str, str]]:
+ crashlist: List[Dict[str, str]] = list()
errno, crashids, err = self.remote('crash', 'ls')
if errno:
- return ''
+ return crashlist
for crashid in crashids.split():
cmd = {'id': crashid}
errno, crashinfo, err = self.remote('crash', 'do_info', cmd, '')
# entity_name might have more than one '.', beware
(etype, eid) = c.get('entity_name', '').split('.', 1)
m = hashlib.sha1()
+ assert self.salt
m.update(self.salt.encode('utf-8'))
m.update(eid.encode('utf-8'))
m.update(self.salt.encode('utf-8'))
crashlist.append(c)
return crashlist
- def get_active_channels(self):
+ def get_active_channels(self) -> List[str]:
r = []
if self.channel_basic:
r.append('basic')
r.append('device')
return r
- def gather_device_report(self):
+ def gather_device_report(self) -> Dict[str, Dict[str, Dict[str, str]]]:
try:
time_format = self.remote('devicehealth', 'get_time_format')
except:
- return None
+ return {}
cutoff = datetime.utcnow() - timedelta(hours=self.interval * 2)
min_sample = cutoff.strftime(time_format)
devices = self.get('devices')['devices']
- res = {} # anon-host-id -> anon-devid -> { timestamp -> record }
+ # anon-host-id -> anon-devid -> { timestamp -> record }
+ res: Dict[str, Dict[str, Dict[str, str]]] = {}
for d in devices:
devid = d['devid']
try:
res[anon_host][anon_devid] = m
return res
- def get_latest(self, daemon_type, daemon_name, stat):
+ def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
data = self.get_counter(daemon_type, daemon_name, stat)[stat]
#self.log.error("get_latest {0} data={1}".format(stat, data))
if data:
else:
return 0
- def compile_report(self, channels=[]):
+ def compile_report(self, channels: Optional[List[str]] = None) -> Dict[str, Any]:
if not channels:
channels = self.get_active_channels()
report = {
report['config'] = self.gather_configs()
# pools
- report['rbd'] = {
- 'num_pools': 0,
- 'num_images_by_pool': [],
- 'mirroring_by_pool': [],
- }
+
+ rbd_num_pools = 0
+ rbd_num_images_by_pool = []
+ rbd_mirroring_by_pool = []
num_pg = 0
report['pools'] = list()
for pool in osd_map['pools']:
if k in ['k', 'm', 'plugin', 'technique',
'crush-failure-domain', 'l']
}
- report['pools'].append(
+ cast(List[Dict[str, Any]], report['pools']).append(
{
'pool': pool['pool'],
'type': pool['type'],
}
)
if 'rbd' in pool['application_metadata']:
- report['rbd']['num_pools'] += 1
+ rbd_num_pools += 1
ioctx = self.rados.open_ioctx(pool['pool_name'])
- report['rbd']['num_images_by_pool'].append(
+ rbd_num_images_by_pool.append(
sum(1 for _ in rbd.RBD().list2(ioctx)))
- report['rbd']['mirroring_by_pool'].append(
+ rbd_mirroring_by_pool.append(
rbd.RBD().mirror_mode_get(ioctx) != rbd.RBD_MIRROR_MODE_DISABLED)
+ report['rbd'] = {
+ 'num_pools': rbd_num_pools,
+ 'num_images_by_pool': rbd_num_images_by_pool,
+ 'mirroring_by_pool': rbd_mirroring_by_pool}
# osds
cluster_network = False
'mds.root_rbytes')
rsnaps = self.get_latest('mds', mds['name'],
'mds.root_rsnaps')
- report['fs']['filesystems'].append({
+ report['fs']['filesystems'].append({ # type: ignore
'max_mds': fs['max_mds'],
'ever_allowed_features': fs['ever_allowed_features'],
'explicitly_allowed_features': fs['explicitly_allowed_features'],
'snaps': rsnaps,
})
num_mds += len(fs['info'])
- report['fs']['total_num_mds'] = num_mds
+ report['fs']['total_num_mds'] = num_mds # type: ignore
# daemons
- report['metadata'] = dict()
- report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
- report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
+ report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
+ mon=self.gather_mon_metadata(mon_map))
# host counts
servers = self.list_servers()
self.log.debug('servers %s' % servers)
- report['hosts'] = {
+ hosts = {
'num': len([h for h in servers if h['hostname']]),
}
for t in ['mon', 'mds', 'osd', 'mgr']:
- report['hosts']['num_with_' + t] = len(
+ hosts['num_with_' + t] = len(
[h for h in servers
if len([s for s in h['services'] if s['type'] == t])]
)
+ report['hosts'] = hosts
report['usage'] = {
'pools': len(df['pools']),
'total_avail_bytes': df['stats']['total_avail_bytes']
}
- report['services'] = defaultdict(int)
+ services: DefaultDict[str, int] = defaultdict(int)
for key, value in service_map['services'].items():
- report['services'][key] += 1
+ services[key] += 1
if key == 'rgw':
- report['rgw'] = {
- 'count': 0,
- }
+ rgw = {}
zones = set()
- realms = set()
zonegroups = set()
frontends = set()
+ count = 0
d = value.get('daemons', dict())
-
- for k,v in d.items():
+ for k, v in d.items():
if k == 'summary' and v:
- report['rgw'][k] = v
+ rgw[k] = v
elif isinstance(v, dict) and 'metadata' in v:
- report['rgw']['count'] += 1
+ count += 1
zones.add(v['metadata']['zone_id'])
zonegroups.add(v['metadata']['zonegroup_id'])
frontends.add(v['metadata']['frontend_type#0'])
if f2:
frontends.add(f2)
- report['rgw']['zones'] = len(zones)
- report['rgw']['zonegroups'] = len(zonegroups)
- report['rgw']['frontends'] = list(frontends) # sets aren't json-serializable
+ rgw['count'] = count
+ rgw['zones'] = len(zones)
+ rgw['zonegroups'] = len(zonegroups)
+ rgw['frontends'] = list(frontends) # sets aren't json-serializable
+ report['rgw'] = rgw
+ report['services'] = services
try:
report['balancer'] = self.remote('balancer', 'gather_telemetry')
return report
- def _try_post(self, what, url, report):
+ def _try_post(self, what: str, url: str, report: Dict[str, Dict[str, str]]) -> Optional[str]:
self.log.info('Sending %s to: %s' % (what, url))
proxies = dict()
if self.proxy:
return fail_reason
return None
- def send(self, report, endpoint=None):
+ class EndPoint(enum.Enum):
+ ceph = 'ceph'
+ device = 'device'
+
+ def send(self,
+ report: Dict[str, Dict[str, str]],
+ endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
if not endpoint:
- endpoint = ['ceph', 'device']
+ endpoint = [self.EndPoint.ceph, self.EndPoint.device]
failed = []
success = []
self.log.debug('Send endpoints %s' % endpoint)
for e in endpoint:
- if e == 'ceph':
+ if e == self.EndPoint.ceph:
fail_reason = self._try_post('ceph report', self.url, report)
if fail_reason:
failed.append(fail_reason)
self.set_store('last_upload', str(now))
success.append('Ceph report sent to {0}'.format(self.url))
self.log.info('Sent report to {0}'.format(self.url))
- elif e == 'device':
+ elif e == self.EndPoint.device:
if 'device' in self.get_active_channels():
devices = self.gather_device_report()
+ assert devices
num_devs = 0
num_hosts = 0
for host, ls in devices.items():
self.log.debug('A telemetry send attempt while opted-out. Asking for license agreement')
return -errno.EPERM, '', "Telemetry data is licensed under the " + LICENSE_NAME + " (" + LICENSE_URL + ").\nTo manually send telemetry data, add '--license " + LICENSE + "' to the 'ceph telemetry send' command.\nPlease consider enabling the telemetry module with 'ceph telemetry on'."
self.last_report = self.compile_report()
- return self.send(self.last_report, command.get('endpoint'))
+ return self.send(self.last_report, endpoint)
elif command['prefix'] == 'telemetry show':
report = self.get_report(channels=command.get('channels', None))
return (-errno.EINVAL, '',
"Command not found '{0}'".format(command['prefix']))
- def on(self):
+ def on(self) -> None:
self.set_module_option('enabled', True)
self.set_module_option('last_opt_revision', REVISION)
- def off(self):
+ def off(self) -> None:
self.set_module_option('enabled', False)
self.set_module_option('last_opt_revision', 1)
- def get_report(self, report_type='default', channels=None):
+ def get_report(self,
+ report_type: str = 'default',
+ channels: Optional[List[str]] = None) -> Dict[str, Any]:
if report_type == 'default':
return self.compile_report(channels=channels)
elif report_type == 'device':
'device_report': self.gather_device_report()}
return {}
- def self_test(self):
+ def self_test(self) -> None:
report = self.compile_report()
if len(report) == 0:
raise RuntimeError('Report is empty')
if 'report_id' not in report:
raise RuntimeError('report_id not found in report')
- def shutdown(self):
+ def shutdown(self) -> None:
self.run = False
self.event.set()
- def refresh_health_checks(self):
+ def refresh_health_checks(self) -> None:
health_checks = {}
if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
health_checks['TELEMETRY_CHANGED'] = {
}
self.set_health_checks(health_checks)
- def serve(self):
+ def serve(self) -> None:
self.load()
self.config_notify()
self.run = True
return True
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
return True, ''