From 0e1909417c7b6b69a525f2245f803cec94e7ca54 Mon Sep 17 00:00:00 2001 From: Yaarit Hatuka Date: Mon, 15 Nov 2021 16:53:59 +0000 Subject: [PATCH] mgr/telemetry: introduce new design for adding new data MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The current design requires increasing the telemetry revision each time we add new data to the report. As a result, users need to re-opt-in to telemetry. This new design allows for adding new data to the report, while allowing users to keep sending only what they already opted-in to, hence no re-opt-in is required. In case users wish to report the new data as well, they need to re-opt-in and enable any new channels. Also, move formatting perf histograms to a function, so we can use it both in `show` and `preview` commands. Fix get_report call in dashboard to use get_report_locked. Signed-off-by: Yaarit Hatuka --- .../mgr/dashboard/controllers/telemetry.py | 2 +- src/pybind/mgr/telemetry/module.py | 445 +++++++++++++++--- 2 files changed, 390 insertions(+), 57 deletions(-) diff --git a/src/pybind/mgr/dashboard/controllers/telemetry.py b/src/pybind/mgr/dashboard/controllers/telemetry.py index 03f34592b1c..792f5471156 100644 --- a/src/pybind/mgr/dashboard/controllers/telemetry.py +++ b/src/pybind/mgr/dashboard/controllers/telemetry.py @@ -213,7 +213,7 @@ class Telemetry(RESTController): :return: Ceph and device report data :rtype: dict """ - return mgr.remote('telemetry', 'get_report', 'all') + return mgr.remote('telemetry', 'get_report_locked', 'all') def singleton_set(self, enable=True, license_name=None): """ diff --git a/src/pybind/mgr/telemetry/module.py b/src/pybind/mgr/telemetry/module.py index 3383516dc3d..c34239dedbe 100644 --- a/src/pybind/mgr/telemetry/module.py +++ b/src/pybind/mgr/telemetry/module.py @@ -15,7 +15,7 @@ import requests import uuid import time from datetime import datetime, timedelta -from threading import Event +from threading import Event, Lock from collections import defaultdict from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union @@ -28,11 +28,6 @@ LICENSE = 'sharing-1-0' LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0' LICENSE_URL = 'https://cdla.io/sharing-1-0/' -# If the telemetry revision has changed since this point, re-require -# an opt-in. This should happen each time we add new information to -# the telemetry report. -LAST_REVISION_RE_OPT_IN = 2 - # Latest revision of the telemetry report. Bump this each time we make # *any* change. REVISION = 3 @@ -62,6 +57,53 @@ REVISION = 3 # - rgw daemons, zones, zonegroups; which rgw frontends # - crush map stats +class Collection(str, enum.Enum): + basic_base = 'basic_base' + device_base = 'device_base' + crash_base = 'crash_base' + ident_base = 'ident_base' + perf_perf = 'perf_perf' + basic_mds_metadata = 'basic_mds_metadata' + +MODULE_COLLECTION = [ + { + "name": Collection.basic_base, + "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)", + "channel": "basic", + "nag": False + }, + { + "name": Collection.device_base, + "description": "Information about device health metrics", + "channel": "device", + "nag": False + }, + { + "name": Collection.crash_base, + "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)", + "channel": "crash", + "nag": False + }, + { + "name": Collection.ident_base, + "description": "User-provided identifying information about the cluster", + "channel": "ident", + "nag": False + }, + { + "name": Collection.perf_perf, + "description": "Information about performance counters of the cluster", + "channel": "perf", + "nag": True + }, + { + "name": Collection.basic_mds_metadata, + "description": "MDS metadata", + "channel": "basic", + "nag": False + } +] + class Module(MgrModule): metadata_keys = [ "arch", @@ -137,10 +179,14 @@ class Module(MgrModule): super(Module, self).__init__(*args, **kwargs) self.event = Event() self.run = False + self.db_collection: List[str] = None + self.last_opted_in_ceph_version: int = None + self.last_opted_out_ceph_version: int = None self.last_upload: Optional[int] = None self.last_report: Dict[str, Any] = dict() self.report_id: Optional[str] = None self.salt: Optional[str] = None + self.get_report_lock = Lock() self.config_update_module_option() # for mypy which does not run the code if TYPE_CHECKING: @@ -156,6 +202,9 @@ class Module(MgrModule): self.channel_crash = True self.channel_device = True self.channel_perf = False + self.db_collection = ['basic_base', 'device_base'] + self.last_opted_in_ceph_version = 17 + self.last_opted_out_ceph_version = 0 def config_update_module_option(self) -> None: for opt in self.MODULE_OPTIONS: @@ -190,6 +239,20 @@ class Module(MgrModule): else: self.salt = salt + self.init_collection() + + last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None) + if last_opted_in_ceph_version is None: + self.last_opted_in_ceph_version = None + else: + self.last_opted_in_ceph_version = int(last_opted_in_ceph_version) + + last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None) + if last_opted_out_ceph_version is None: + self.last_opted_out_ceph_version = None + else: + self.last_opted_out_ceph_version = int(last_opted_out_ceph_version) + def gather_osd_metadata(self, osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]: keys = ["osd_objectstore", "rotational"] @@ -234,6 +297,29 @@ class Module(MgrModule): return metadata + def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]: + metadata: Dict[str, Dict[str, int]] = dict() + + res = self.get('mds_metadata') # metadata of *all* mds daemons + if res is None or not res: + self.log.debug('Could not get metadata for mds daemons') + return metadata + + keys = list() + keys += self.metadata_keys + + for key in keys: + metadata[key] = defaultdict(int) + + for mds in res.values(): + for k, v in mds.items(): + if k not in keys: + continue + + metadata[k][v] += 1 + + return metadata + def gather_crush_info(self) -> Dict[str, Union[int, bool, List[int], @@ -601,7 +687,7 @@ class Module(MgrModule): for collection in all_perf_counters[daemon]: # Split the collection to avoid redundancy in final report; i.e.: - # bluestore.kv_flush_lat, bluestore.kv_final_lat --> + # bluestore.kv_flush_lat, bluestore.kv_final_lat --> # bluestore: kv_flush_lat, kv_final_lat col_0, col_1 = collection.split('.') @@ -743,6 +829,8 @@ class Module(MgrModule): 'channels': channels, 'channels_available': ALL_CHANNELS, 'license': LICENSE, + 'collections_available': [c['name'].name for c in MODULE_COLLECTION], + 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])], } if 'ident' in channels: @@ -911,8 +999,12 @@ class Module(MgrModule): report['fs']['total_num_mds'] = num_mds # type: ignore # daemons - report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map), - mon=self.gather_mon_metadata(mon_map)) + report['metadata'] = dict() + report['metadata']['osd'] = self.gather_osd_metadata(osd_map) + report['metadata']['mon'] = self.gather_mon_metadata(mon_map) + + if self.is_enabled_collection(Collection.basic_mds_metadata): + report['metadata']['mds'] = self.gather_mds_metadata() # host counts servers = self.list_servers() @@ -981,14 +1073,15 @@ class Module(MgrModule): report['crashes'] = self.gather_crashinfo() if 'perf' in channels: - report['perf_counters'] = self.gather_perf_counters('separated') - report['stats_per_pool'] = self.get_stats_per_pool() - report['stats_per_pg'] = self.get_stats_per_pg() - report['io_rate'] = self.get_io_rate() - report['osd_perf_histograms'] = self.get_osd_histograms('separated') - report['mempool'] = self.get_mempool('separated') - report['heap_stats'] = self.get_heap_stats() - report['rocksdb_stats'] = self.get_rocksdb_stats() + if self.is_enabled_collection(Collection.perf): + report['perf_counters'] = self.gather_perf_counters('separated') + report['stats_per_pool'] = self.get_stats_per_pool() + report['stats_per_pg'] = self.get_stats_per_pg() + report['io_rate'] = self.get_io_rate() + report['osd_perf_histograms'] = self.get_osd_histograms('separated') + report['mempool'] = self.get_mempool('separated') + report['heap_stats'] = self.get_heap_stats() + report['rocksdb_stats'] = self.get_rocksdb_stats() # NOTE: We do not include the 'device' channel in this report; it is # sent to a different endpoint. @@ -1015,6 +1108,117 @@ class Module(MgrModule): ceph = 'ceph' device = 'device' + def collection_delta(self) -> List[Collection]: + ''' + Find collections that are available in the module, but are not in the db + ''' + new_collection : List[Collection] = [] + + for c in MODULE_COLLECTION: + for k, v in c.items(): + if k == 'name' and v.name not in self.db_collection: + new_collection.append(v) + + return new_collection + + def is_major_upgrade(self) -> bool: + ''' + Returns True only if the user last opted-in to an older major + ''' + if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0: + # we do not know what Ceph version was when the user last opted-in, + # thus we do not wish to nag in case of a major upgrade + return False + + mon_map = self.get('mon_map') + mon_min = mon_map.get("min_mon_release", 0) + + if mon_min - self.last_opted_in_ceph_version > 0: + self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}") + return True + + return False + + def is_opted_in(self) -> bool: + # If len is 0 it means that the user is either opted-out (never + # opted-in, or invoked `telemetry off`), or they upgraded from a + # telemetry revision 1 or 2, which required to re-opt in to revision 3, + # regardless, hence is considered as opted-out + return len(self.db_collection) > 0 + + def should_nag(self) -> bool: + # Find delta between opted-in collections and module collections; + # nag only if module has a collection which is not in db, and nag == True. + + # We currently do not nag if the user is opted-out (or never opted-in). + # If we wish to do this in the future, we need to have a tri-mode state + # (opted in, opted out, no action yet), and it needs to be guarded by a + # config option (so that nagging can be turned off via config). + # We also need to add a last_opted_out_ceph_version variable, for the + # major upgrade check. + + # check if there are collections the user is not opt-in to + # that we should nag about + for c in MODULE_COLLECTION: + for k, v in c.items(): + if k == 'name' and v.name not in self.db_collection: + if c['nag'] == True: + self.log.debug(f"The collection: {v} is not reported, and we should nag about it") + return True + + # user might be opted-in to the most recent collection, or there is no + # new collection which requires nagging about + return self.is_major_upgrade() + + def init_collection(self) -> None: + # We fetch from db the collections the user had already opted-in to. + # During the transition the results will be empty, but the user might + # be opted-in to an older version (e.g. revision = 3) + + collection = self.get_store('collection') + + if collection is not None: + self.db_collection = json.loads(collection) + + if self.db_collection is None: + # happens once on upgrade + if not self.enabled: + # user is not opted-in + self.set_store('collection', json.dumps([])) + self.log.debug("user is not opted-in") + else: + # user is opted-in, verify the revision: + if self.last_opt_revision == REVISION: + self.log.debug(f"telemetry revision is {REVISION}") + base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name] + self.set_store('collection', json.dumps(base_collection)) + else: + # user is opted-in to an older version, meaning they need + # to re-opt in regardless + self.set_store('collection', json.dumps([])) + self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in") + + # reload collection after setting + collection = self.get_store('collection') + self.db_collection = json.loads(collection) + else: + # user has already upgraded + self.log.debug(f"user has upgraded already: collection: {self.db_collection}") + + def is_enabled_collection(self, collection: Collection) -> bool: + return collection.name in self.db_collection + + def opt_in_all_collections(self) -> None: + """ + Opt-in to all collections; Update db with the currently available collections in the module + """ + for c in MODULE_COLLECTION: + for k, v in c.items(): + if k == 'name' and v.name not in self.db_collection: + self.db_collection.append(v.name) + + self.set_store('collection', json.dumps(self.db_collection)) + def send(self, report: Dict[str, Dict[str, str]], endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]: @@ -1058,6 +1262,45 @@ class Module(MgrModule): return 1, '', '\n'.join(success + failed) return 0, '', '\n'.join(success) + def format_perf_histogram(self, report): + # Formatting the perf histograms so they are human-readable. This will change the + # ranges and values, which are currently in list form, into strings so that + # they are displayed horizontally instead of vertically. + try: + # Formatting ranges and values in osd_perf_histograms + modes_to_be_formatted = ['osd_perf_histograms_aggregated', 'osd_perf_histograms_separated'] + for mode in modes_to_be_formatted: + for config in report[mode]: + for histogram in config: + # Adjust ranges by converting lists into strings + for axis in config[histogram]['axes']: + for i in range(0, len(axis['ranges'])): + axis['ranges'][i] = str(axis['ranges'][i]) + # Adjust values by converting lists into strings + if mode == 'osd_perf_histograms_aggregated': + for i in range(0, len(config[histogram]['values'])): + config[histogram]['values'][i] = str(config[histogram]['values'][i]) + else: # if mode == 'osd_perf_histograms_separated' + for osd in config[histogram]['osds']: + for i in range(0, len(osd['values'])): + osd['values'][i] = str(osd['values'][i]) + except KeyError: + # If the perf channel is not enabled, there should be a KeyError since + # 'osd_perf_histograms' would not be present in the report. In that case, + # the show function should pass as usual without trying to format the + # histograms. + pass + + + def restore_default_opt_setting(self, opt_name) -> None: + for o in self.MODULE_OPTIONS: + if o['name'] == opt_name: + default_val = o.get('default', None) + self.set_module_option(opt_name, default_val) + setattr(self, + opt_name, + default_val) + @CLIReadCommand('telemetry status') def status(self) -> Tuple[int, str, str]: ''' @@ -1070,6 +1313,28 @@ class Module(MgrModule): if self.last_upload else self.last_upload) return 0, json.dumps(r, indent=4, sort_keys=True), '' + @CLIReadCommand('telemetry diff') + def diff(self) -> Tuple[int, str, str]: + ''' + Show the diff between opted-in collection and available collection + ''' + diff = [] + keys = ['nag'] + + for c in MODULE_COLLECTION: + for k, v in c.items(): + if k == 'name': + if not self.is_enabled_collection(v): + diff.append({key: val for key, val in c.items() if key not in keys}) + + r = None + if diff == []: + r = "Telemetry is up to date" + else: + r = json.dumps(diff, indent=4, sort_keys=True) + + return 0, r, '' + @CLICommand('telemetry on') def on(self, license: Optional[str] = None) -> Tuple[int, str, str]: ''' @@ -1080,23 +1345,63 @@ class Module(MgrModule): To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.''' else: self.set_module_option('enabled', True) - self.set_module_option('last_opt_revision', REVISION) - return 0, '', '' + self.enabled = True + self.opt_in_all_collections() + + # for major releases upgrade nagging + mon_map = self.get('mon_map') + mon_min = mon_map.get("min_mon_release", 0) + self.set_store('last_opted_in_ceph_version', str(mon_min)) + self.last_opted_in_ceph_version = mon_min + + msg = 'Telemetry is on.' + disabled_channels = '' + active_channels = self.get_active_channels() + for c in ALL_CHANNELS: + if c not in active_channels and c != 'ident': + disabled_channels = f"{disabled_channels} {c}" + + if len(disabled_channels) > 0: + msg = f"{msg}\nSome channels are disabled, please enable with:\n"\ + f"`ceph telemetry enable channel{disabled_channels}`" + + return 0, msg, '' @CLICommand('telemetry off') def off(self) -> Tuple[int, str, str]: ''' Disable telemetry reports from this cluster ''' + if not self.enabled: + # telemetry is already off + msg = 'Telemetry is currently not enabled, nothing to turn off. '\ + 'Please consider opting-in with `ceph telemetry on`.\n' \ + 'Preview sample reports with `ceph telemetry preview`.' + return 0, msg, '' + self.set_module_option('enabled', False) - self.set_module_option('last_opt_revision', 1) - return 0, '', '' + self.enabled = False + self.set_store('collection', json.dumps([])) + self.db_collection = [] + + # we might need this info in the future, in case + # of nagging when user is opted-out + mon_map = self.get('mon_map') + mon_min = mon_map.get("min_mon_release", 0) + self.set_store('last_opted_out_ceph_version', str(mon_min)) + self.last_opted_out_ceph_version = mon_min + + for c in ALL_CHANNELS: + self.restore_default_opt_setting(f"channel_{c}") + + msg = 'Telemetry is now disabled. Channels settings are restored to default.' + return 0, msg, '' @CLICommand('telemetry send') def do_send(self, endpoint: Optional[List[EndPoint]] = None, license: Optional[str] = None) -> Tuple[int, str, str]: - if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and license != LICENSE: + if not self.is_opted_in() and license != LICENSE: self.log.debug(('A telemetry send attempt while opted-out. ' 'Asking for license agreement')) return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}). @@ -1109,38 +1414,54 @@ Please consider enabling the telemetry module with 'ceph telemetry on'.''' @CLIReadCommand('telemetry show') def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: ''' - Show report of all channels + Show a sample report of all enabled channels (except for 'device' channel) ''' - report = self.get_report(channels=channels) + if not self.enabled: + # if telemetry is off, no report is being sent, hence nothing to show + msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \ + 'Preview sample reports with `ceph telemetry preview`.' + return 0, msg, '' + + report = self.get_report_locked(channels=channels) + self.format_perf_histogram(report) + report = json.dumps(report, indent=4, sort_keys=True) - # Formatting the perf histograms so they are human-readable. This will change the - # ranges and values, which are currently in list form, into strings so that - # they are displayed horizontally instead of vertically. - try: - # Formatting ranges and values in osd_perf_histograms - modes_to_be_formatted = ['osd_perf_histograms'] - for mode in modes_to_be_formatted: - for config in report[mode]: - for histogram in config: - # Adjust ranges by converting lists into strings - for axis in config[histogram]['axes']: - for i in range(0, len(axis['ranges'])): - axis['ranges'][i] = str(axis['ranges'][i]) - # Adjust values by converting lists into strings - if mode == 'osd_perf_histograms_aggregated': - for i in range(0, len(config[histogram]['values'])): - config[histogram]['values'][i] = str(config[histogram]['values'][i]) - else: # if mode == 'osd_perf_histograms_separated' - for osd in config[histogram]['osds']: - for i in range(0, len(osd['values'])): - osd['values'][i] = str(osd['values'][i]) - except KeyError: - # If the perf channel is not enabled, there should be a KeyError since - # 'osd_perf_histograms' would not be present in the report. In that case, - # the show function should pass as usual without trying to format the - # histograms. - pass + if self.channel_device: + report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.''' + return 0, report, '' + + @CLIReadCommand('telemetry preview') + def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]: + ''' + Preview a sample report of the most recent collections available + ''' + report = {} + + # We use a lock to prevent a scenario where the user wishes to preview + # the report, and at the same time the module hits the interval of + # sending a report with the opted-in collection, which has less data + # than in the preview report. + with self.get_report_lock: + if len(self.collection_delta()) == 0: + # user is already opted-in to the most recent collection + msg = 'Telemetry is up to date, see report with `ceph telemetry show`' + return 0, msg, '' + else: + # there are collections the user is not opted-in to + next_collection = [] + + for c in MODULE_COLLECTION: + for k, v in c.items(): + if k == 'name': + next_collection.append(v.name) + + opted_in_collection = self.db_collection + self.db_collection = next_collection + report = self.get_report(channels=channels) + self.db_collection = opted_in_collection + + self.format_perf_histogram(report) report = json.dumps(report, indent=4, sort_keys=True) if self.channel_device: report += ''' @@ -1150,11 +1471,20 @@ Device report is generated separately. To see it run 'ceph telemetry show-device @CLIReadCommand('telemetry show-device') def show_device(self) -> Tuple[int, str, str]: - return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), '' + return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), '' @CLIReadCommand('telemetry show-all') def show_all(self) -> Tuple[int, str, str]: - return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), '' + return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), '' + + def get_report_locked(self, + report_type: str = 'default', + channels: Optional[List[str]] = None) -> Dict[str, Any]: + ''' + A wrapper around get_report to allow for compiling a report of the most recent module collections + ''' + with self.get_report_lock: + return self.get_report(report_type, channels) def get_report(self, report_type: str = 'default', @@ -1182,12 +1512,13 @@ Device report is generated separately. To see it run 'ceph telemetry show-device def refresh_health_checks(self) -> None: health_checks = {} - if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN: + # TODO do we want to nag also in case the user is not opted-in? + if self.enabled and self.should_nag(): health_checks['TELEMETRY_CHANGED'] = { 'severity': 'warning', 'summary': 'Telemetry requires re-opt-in', 'detail': [ - 'telemetry report includes new information; must re-opt-in (or out)' + 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`' ] } self.set_health_checks(health_checks) @@ -1204,7 +1535,7 @@ Device report is generated separately. To see it run 'ceph telemetry show-device self.refresh_health_checks() - if self.last_opt_revision < LAST_REVISION_RE_OPT_IN: + if not self.is_opted_in(): self.log.debug('Not sending report until user re-opts-in') self.event.wait(1800) continue @@ -1222,9 +1553,11 @@ Device report is generated separately. To see it run 'ceph telemetry show-device try: self.last_report = self.compile_report() except Exception: + # TODO add the exception here self.log.exception('Exception while compiling report:') self.send(self.last_report) + self.log.debug("SENDING REPORT") else: self.log.debug('Interval for sending new report has not expired') -- 2.39.5