import uuid
import time
from datetime import datetime, timedelta
-from threading import Event
+from threading import Event, Lock
from collections import defaultdict
from typing import cast, Any, DefaultDict, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
LICENSE_NAME = 'Community Data License Agreement - Sharing - Version 1.0'
LICENSE_URL = 'https://cdla.io/sharing-1-0/'
-# If the telemetry revision has changed since this point, re-require
-# an opt-in. This should happen each time we add new information to
-# the telemetry report.
-LAST_REVISION_RE_OPT_IN = 2
-
# Latest revision of the telemetry report. Bump this each time we make
# *any* change.
REVISION = 3
# - rgw daemons, zones, zonegroups; which rgw frontends
# - crush map stats
+class Collection(str, enum.Enum):
+ basic_base = 'basic_base'
+ device_base = 'device_base'
+ crash_base = 'crash_base'
+ ident_base = 'ident_base'
+ perf_perf = 'perf_perf'
+ basic_mds_metadata = 'basic_mds_metadata'
+
+MODULE_COLLECTION = [
+ {
+ "name": Collection.basic_base,
+ "description": "Basic information about the cluster (capacity, number and type of daemons, version, etc.)",
+ "channel": "basic",
+ "nag": False
+ },
+ {
+ "name": Collection.device_base,
+ "description": "Information about device health metrics",
+ "channel": "device",
+ "nag": False
+ },
+ {
+ "name": Collection.crash_base,
+ "description": "Information about daemon crashes (daemon type and version, backtrace, etc.)",
+ "channel": "crash",
+ "nag": False
+ },
+ {
+ "name": Collection.ident_base,
+ "description": "User-provided identifying information about the cluster",
+ "channel": "ident",
+ "nag": False
+ },
+ {
+ "name": Collection.perf_perf,
+ "description": "Information about performance counters of the cluster",
+ "channel": "perf",
+ "nag": True
+ },
+ {
+ "name": Collection.basic_mds_metadata,
+ "description": "MDS metadata",
+ "channel": "basic",
+ "nag": False
+ }
+]
+
class Module(MgrModule):
metadata_keys = [
"arch",
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
self.run = False
+ self.db_collection: List[str] = None
+ self.last_opted_in_ceph_version: int = None
+ self.last_opted_out_ceph_version: int = None
self.last_upload: Optional[int] = None
self.last_report: Dict[str, Any] = dict()
self.report_id: Optional[str] = None
self.salt: Optional[str] = None
+ self.get_report_lock = Lock()
self.config_update_module_option()
# for mypy which does not run the code
if TYPE_CHECKING:
self.channel_crash = True
self.channel_device = True
self.channel_perf = False
+ self.db_collection = ['basic_base', 'device_base']
+ self.last_opted_in_ceph_version = 17
+ self.last_opted_out_ceph_version = 0
def config_update_module_option(self) -> None:
for opt in self.MODULE_OPTIONS:
else:
self.salt = salt
+ self.init_collection()
+
+ last_opted_in_ceph_version = self.get_store('last_opted_in_ceph_version', None)
+ if last_opted_in_ceph_version is None:
+ self.last_opted_in_ceph_version = None
+ else:
+ self.last_opted_in_ceph_version = int(last_opted_in_ceph_version)
+
+ last_opted_out_ceph_version = self.get_store('last_opted_out_ceph_version', None)
+ if last_opted_out_ceph_version is None:
+ self.last_opted_out_ceph_version = None
+ else:
+ self.last_opted_out_ceph_version = int(last_opted_out_ceph_version)
+
def gather_osd_metadata(self,
osd_map: Dict[str, List[Dict[str, int]]]) -> Dict[str, Dict[str, int]]:
keys = ["osd_objectstore", "rotational"]
return metadata
+ def gather_mds_metadata(self) -> Dict[str, Dict[str, int]]:
+ metadata: Dict[str, Dict[str, int]] = dict()
+
+ res = self.get('mds_metadata') # metadata of *all* mds daemons
+ if res is None or not res:
+ self.log.debug('Could not get metadata for mds daemons')
+ return metadata
+
+ keys = list()
+ keys += self.metadata_keys
+
+ for key in keys:
+ metadata[key] = defaultdict(int)
+
+ for mds in res.values():
+ for k, v in mds.items():
+ if k not in keys:
+ continue
+
+ metadata[k][v] += 1
+
+ return metadata
+
def gather_crush_info(self) -> Dict[str, Union[int,
bool,
List[int],
for collection in all_perf_counters[daemon]:
# Split the collection to avoid redundancy in final report; i.e.:
- # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
+ # bluestore.kv_flush_lat, bluestore.kv_final_lat -->
# bluestore: kv_flush_lat, kv_final_lat
col_0, col_1 = collection.split('.')
'channels': channels,
'channels_available': ALL_CHANNELS,
'license': LICENSE,
+ 'collections_available': [c['name'].name for c in MODULE_COLLECTION],
+ 'collections_opted_in': [c['name'].name for c in MODULE_COLLECTION if self.is_enabled_collection(c['name'])],
}
if 'ident' in channels:
report['fs']['total_num_mds'] = num_mds # type: ignore
# daemons
- report['metadata'] = dict(osd=self.gather_osd_metadata(osd_map),
- mon=self.gather_mon_metadata(mon_map))
+ report['metadata'] = dict()
+ report['metadata']['osd'] = self.gather_osd_metadata(osd_map)
+ report['metadata']['mon'] = self.gather_mon_metadata(mon_map)
+
+ if self.is_enabled_collection(Collection.basic_mds_metadata):
+ report['metadata']['mds'] = self.gather_mds_metadata()
# host counts
servers = self.list_servers()
report['crashes'] = self.gather_crashinfo()
if 'perf' in channels:
- report['perf_counters'] = self.gather_perf_counters('separated')
- report['stats_per_pool'] = self.get_stats_per_pool()
- report['stats_per_pg'] = self.get_stats_per_pg()
- report['io_rate'] = self.get_io_rate()
- report['osd_perf_histograms'] = self.get_osd_histograms('separated')
- report['mempool'] = self.get_mempool('separated')
- report['heap_stats'] = self.get_heap_stats()
- report['rocksdb_stats'] = self.get_rocksdb_stats()
+ if self.is_enabled_collection(Collection.perf):
+ report['perf_counters'] = self.gather_perf_counters('separated')
+ report['stats_per_pool'] = self.get_stats_per_pool()
+ report['stats_per_pg'] = self.get_stats_per_pg()
+ report['io_rate'] = self.get_io_rate()
+ report['osd_perf_histograms'] = self.get_osd_histograms('separated')
+ report['mempool'] = self.get_mempool('separated')
+ report['heap_stats'] = self.get_heap_stats()
+ report['rocksdb_stats'] = self.get_rocksdb_stats()
# NOTE: We do not include the 'device' channel in this report; it is
# sent to a different endpoint.
ceph = 'ceph'
device = 'device'
+ def collection_delta(self) -> List[Collection]:
+ '''
+ Find collections that are available in the module, but are not in the db
+ '''
+ new_collection : List[Collection] = []
+
+ for c in MODULE_COLLECTION:
+ for k, v in c.items():
+ if k == 'name' and v.name not in self.db_collection:
+ new_collection.append(v)
+
+ return new_collection
+
+ def is_major_upgrade(self) -> bool:
+ '''
+ Returns True only if the user last opted-in to an older major
+ '''
+ if self.last_opted_in_ceph_version is None or self.last_opted_in_ceph_version == 0:
+ # we do not know what Ceph version was when the user last opted-in,
+ # thus we do not wish to nag in case of a major upgrade
+ return False
+
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+
+ if mon_min - self.last_opted_in_ceph_version > 0:
+ self.log.debug(f"major upgrade: mon_min is: {mon_min} and user last opted-in in {self.last_opted_in_ceph_version}")
+ return True
+
+ return False
+
+ def is_opted_in(self) -> bool:
+ # If len is 0 it means that the user is either opted-out (never
+ # opted-in, or invoked `telemetry off`), or they upgraded from a
+ # telemetry revision 1 or 2, which required to re-opt in to revision 3,
+ # regardless, hence is considered as opted-out
+ return len(self.db_collection) > 0
+
+ def should_nag(self) -> bool:
+ # Find delta between opted-in collections and module collections;
+ # nag only if module has a collection which is not in db, and nag == True.
+
+ # We currently do not nag if the user is opted-out (or never opted-in).
+ # If we wish to do this in the future, we need to have a tri-mode state
+ # (opted in, opted out, no action yet), and it needs to be guarded by a
+ # config option (so that nagging can be turned off via config).
+ # We also need to add a last_opted_out_ceph_version variable, for the
+ # major upgrade check.
+
+ # check if there are collections the user is not opt-in to
+ # that we should nag about
+ for c in MODULE_COLLECTION:
+ for k, v in c.items():
+ if k == 'name' and v.name not in self.db_collection:
+ if c['nag'] == True:
+ self.log.debug(f"The collection: {v} is not reported, and we should nag about it")
+ return True
+
+ # user might be opted-in to the most recent collection, or there is no
+ # new collection which requires nagging about
+ return self.is_major_upgrade()
+
+ def init_collection(self) -> None:
+ # We fetch from db the collections the user had already opted-in to.
+ # During the transition the results will be empty, but the user might
+ # be opted-in to an older version (e.g. revision = 3)
+
+ collection = self.get_store('collection')
+
+ if collection is not None:
+ self.db_collection = json.loads(collection)
+
+ if self.db_collection is None:
+ # happens once on upgrade
+ if not self.enabled:
+ # user is not opted-in
+ self.set_store('collection', json.dumps([]))
+ self.log.debug("user is not opted-in")
+ else:
+ # user is opted-in, verify the revision:
+ if self.last_opt_revision == REVISION:
+ self.log.debug(f"telemetry revision is {REVISION}")
+ base_collection = [Collection.basic_base.name, Collection.device_base.name, Collection.crash_base.name, Collection.ident_base.name]
+ self.set_store('collection', json.dumps(base_collection))
+ else:
+ # user is opted-in to an older version, meaning they need
+ # to re-opt in regardless
+ self.set_store('collection', json.dumps([]))
+ self.log.debug(f"user is opted-in but revision is old ({self.last_opt_revision}), needs to re-opt-in")
+
+ # reload collection after setting
+ collection = self.get_store('collection')
+ self.db_collection = json.loads(collection)
+ else:
+ # user has already upgraded
+ self.log.debug(f"user has upgraded already: collection: {self.db_collection}")
+
+ def is_enabled_collection(self, collection: Collection) -> bool:
+ return collection.name in self.db_collection
+
+ def opt_in_all_collections(self) -> None:
+ """
+ Opt-in to all collections; Update db with the currently available collections in the module
+ """
+ for c in MODULE_COLLECTION:
+ for k, v in c.items():
+ if k == 'name' and v.name not in self.db_collection:
+ self.db_collection.append(v.name)
+
+ self.set_store('collection', json.dumps(self.db_collection))
+
def send(self,
report: Dict[str, Dict[str, str]],
endpoint: Optional[List[EndPoint]] = None) -> Tuple[int, str, str]:
return 1, '', '\n'.join(success + failed)
return 0, '', '\n'.join(success)
+ def format_perf_histogram(self, report):
+ # Formatting the perf histograms so they are human-readable. This will change the
+ # ranges and values, which are currently in list form, into strings so that
+ # they are displayed horizontally instead of vertically.
+ try:
+ # Formatting ranges and values in osd_perf_histograms
+ modes_to_be_formatted = ['osd_perf_histograms_aggregated', 'osd_perf_histograms_separated']
+ for mode in modes_to_be_formatted:
+ for config in report[mode]:
+ for histogram in config:
+ # Adjust ranges by converting lists into strings
+ for axis in config[histogram]['axes']:
+ for i in range(0, len(axis['ranges'])):
+ axis['ranges'][i] = str(axis['ranges'][i])
+ # Adjust values by converting lists into strings
+ if mode == 'osd_perf_histograms_aggregated':
+ for i in range(0, len(config[histogram]['values'])):
+ config[histogram]['values'][i] = str(config[histogram]['values'][i])
+ else: # if mode == 'osd_perf_histograms_separated'
+ for osd in config[histogram]['osds']:
+ for i in range(0, len(osd['values'])):
+ osd['values'][i] = str(osd['values'][i])
+ except KeyError:
+ # If the perf channel is not enabled, there should be a KeyError since
+ # 'osd_perf_histograms' would not be present in the report. In that case,
+ # the show function should pass as usual without trying to format the
+ # histograms.
+ pass
+
+
+ def restore_default_opt_setting(self, opt_name) -> None:
+ for o in self.MODULE_OPTIONS:
+ if o['name'] == opt_name:
+ default_val = o.get('default', None)
+ self.set_module_option(opt_name, default_val)
+ setattr(self,
+ opt_name,
+ default_val)
+
@CLIReadCommand('telemetry status')
def status(self) -> Tuple[int, str, str]:
'''
if self.last_upload else self.last_upload)
return 0, json.dumps(r, indent=4, sort_keys=True), ''
+ @CLIReadCommand('telemetry diff')
+ def diff(self) -> Tuple[int, str, str]:
+ '''
+ Show the diff between opted-in collection and available collection
+ '''
+ diff = []
+ keys = ['nag']
+
+ for c in MODULE_COLLECTION:
+ for k, v in c.items():
+ if k == 'name':
+ if not self.is_enabled_collection(v):
+ diff.append({key: val for key, val in c.items() if key not in keys})
+
+ r = None
+ if diff == []:
+ r = "Telemetry is up to date"
+ else:
+ r = json.dumps(diff, indent=4, sort_keys=True)
+
+ return 0, r, ''
+
@CLICommand('telemetry on')
def on(self, license: Optional[str] = None) -> Tuple[int, str, str]:
'''
To enable, add '--license {LICENSE}' to the 'ceph telemetry on' command.'''
else:
self.set_module_option('enabled', True)
- self.set_module_option('last_opt_revision', REVISION)
- return 0, '', ''
+ self.enabled = True
+ self.opt_in_all_collections()
+
+ # for major releases upgrade nagging
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+ self.set_store('last_opted_in_ceph_version', str(mon_min))
+ self.last_opted_in_ceph_version = mon_min
+
+ msg = 'Telemetry is on.'
+ disabled_channels = ''
+ active_channels = self.get_active_channels()
+ for c in ALL_CHANNELS:
+ if c not in active_channels and c != 'ident':
+ disabled_channels = f"{disabled_channels} {c}"
+
+ if len(disabled_channels) > 0:
+ msg = f"{msg}\nSome channels are disabled, please enable with:\n"\
+ f"`ceph telemetry enable channel{disabled_channels}`"
+
+ return 0, msg, ''
@CLICommand('telemetry off')
def off(self) -> Tuple[int, str, str]:
'''
Disable telemetry reports from this cluster
'''
+ if not self.enabled:
+ # telemetry is already off
+ msg = 'Telemetry is currently not enabled, nothing to turn off. '\
+ 'Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
self.set_module_option('enabled', False)
- self.set_module_option('last_opt_revision', 1)
- return 0, '', ''
+ self.enabled = False
+ self.set_store('collection', json.dumps([]))
+ self.db_collection = []
+
+ # we might need this info in the future, in case
+ # of nagging when user is opted-out
+ mon_map = self.get('mon_map')
+ mon_min = mon_map.get("min_mon_release", 0)
+ self.set_store('last_opted_out_ceph_version', str(mon_min))
+ self.last_opted_out_ceph_version = mon_min
+
+ for c in ALL_CHANNELS:
+ self.restore_default_opt_setting(f"channel_{c}")
+
+ msg = 'Telemetry is now disabled. Channels settings are restored to default.'
+ return 0, msg, ''
@CLICommand('telemetry send')
def do_send(self,
endpoint: Optional[List[EndPoint]] = None,
license: Optional[str] = None) -> Tuple[int, str, str]:
- if self.last_opt_revision < LAST_REVISION_RE_OPT_IN and license != LICENSE:
+ if not self.is_opted_in() and license != LICENSE:
self.log.debug(('A telemetry send attempt while opted-out. '
'Asking for license agreement'))
return -errno.EPERM, '', f'''Telemetry data is licensed under the {LICENSE_NAME} ({LICENSE_URL}).
@CLIReadCommand('telemetry show')
def show(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
'''
- Show report of all channels
+ Show a sample report of all enabled channels (except for 'device' channel)
'''
- report = self.get_report(channels=channels)
+ if not self.enabled:
+ # if telemetry is off, no report is being sent, hence nothing to show
+ msg = 'Telemetry is off. Please consider opting-in with `ceph telemetry on`.\n' \
+ 'Preview sample reports with `ceph telemetry preview`.'
+ return 0, msg, ''
+
+ report = self.get_report_locked(channels=channels)
+ self.format_perf_histogram(report)
+ report = json.dumps(report, indent=4, sort_keys=True)
- # Formatting the perf histograms so they are human-readable. This will change the
- # ranges and values, which are currently in list form, into strings so that
- # they are displayed horizontally instead of vertically.
- try:
- # Formatting ranges and values in osd_perf_histograms
- modes_to_be_formatted = ['osd_perf_histograms']
- for mode in modes_to_be_formatted:
- for config in report[mode]:
- for histogram in config:
- # Adjust ranges by converting lists into strings
- for axis in config[histogram]['axes']:
- for i in range(0, len(axis['ranges'])):
- axis['ranges'][i] = str(axis['ranges'][i])
- # Adjust values by converting lists into strings
- if mode == 'osd_perf_histograms_aggregated':
- for i in range(0, len(config[histogram]['values'])):
- config[histogram]['values'][i] = str(config[histogram]['values'][i])
- else: # if mode == 'osd_perf_histograms_separated'
- for osd in config[histogram]['osds']:
- for i in range(0, len(osd['values'])):
- osd['values'][i] = str(osd['values'][i])
- except KeyError:
- # If the perf channel is not enabled, there should be a KeyError since
- # 'osd_perf_histograms' would not be present in the report. In that case,
- # the show function should pass as usual without trying to format the
- # histograms.
- pass
+ if self.channel_device:
+ report += '''\nDevice report is generated separately. To see it run 'ceph telemetry show-device'.'''
+ return 0, report, ''
+
+ @CLIReadCommand('telemetry preview')
+ def preview(self, channels: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ '''
+ Preview a sample report of the most recent collections available
+ '''
+ report = {}
+
+ # We use a lock to prevent a scenario where the user wishes to preview
+ # the report, and at the same time the module hits the interval of
+ # sending a report with the opted-in collection, which has less data
+ # than in the preview report.
+ with self.get_report_lock:
+ if len(self.collection_delta()) == 0:
+ # user is already opted-in to the most recent collection
+ msg = 'Telemetry is up to date, see report with `ceph telemetry show`'
+ return 0, msg, ''
+ else:
+ # there are collections the user is not opted-in to
+ next_collection = []
+
+ for c in MODULE_COLLECTION:
+ for k, v in c.items():
+ if k == 'name':
+ next_collection.append(v.name)
+
+ opted_in_collection = self.db_collection
+ self.db_collection = next_collection
+ report = self.get_report(channels=channels)
+ self.db_collection = opted_in_collection
+
+ self.format_perf_histogram(report)
report = json.dumps(report, indent=4, sort_keys=True)
if self.channel_device:
report += '''
@CLIReadCommand('telemetry show-device')
def show_device(self) -> Tuple[int, str, str]:
- return 0, json.dumps(self.get_report('device'), indent=4, sort_keys=True), ''
+ return 0, json.dumps(self.get_report_locked('device'), indent=4, sort_keys=True), ''
@CLIReadCommand('telemetry show-all')
def show_all(self) -> Tuple[int, str, str]:
- return 0, json.dumps(self.get_report('all'), indent=4, sort_keys=True), ''
+ return 0, json.dumps(self.get_report_locked('all'), indent=4, sort_keys=True), ''
+
+ def get_report_locked(self,
+ report_type: str = 'default',
+ channels: Optional[List[str]] = None) -> Dict[str, Any]:
+ '''
+ A wrapper around get_report to allow for compiling a report of the most recent module collections
+ '''
+ with self.get_report_lock:
+ return self.get_report(report_type, channels)
def get_report(self,
report_type: str = 'default',
def refresh_health_checks(self) -> None:
health_checks = {}
- if self.enabled and self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
+ # TODO do we want to nag also in case the user is not opted-in?
+ if self.enabled and self.should_nag():
health_checks['TELEMETRY_CHANGED'] = {
'severity': 'warning',
'summary': 'Telemetry requires re-opt-in',
'detail': [
- 'telemetry report includes new information; must re-opt-in (or out)'
+ 'telemetry module includes new collections; please re-opt-in to new collections with `ceph telemetry on`'
]
}
self.set_health_checks(health_checks)
self.refresh_health_checks()
- if self.last_opt_revision < LAST_REVISION_RE_OPT_IN:
+ if not self.is_opted_in():
self.log.debug('Not sending report until user re-opts-in')
self.event.wait(1800)
continue
try:
self.last_report = self.compile_report()
except Exception:
+ # TODO add the exception here
self.log.exception('Exception while compiling report:')
self.send(self.last_report)
+ self.log.debug("SENDING REPORT")
else:
self.log.debug('Interval for sending new report has not expired')