From 1cd4edb6a675e816fc8a555fab141715aee6d070 Mon Sep 17 00:00:00 2001 From: Laura Flores Date: Tue, 5 Jul 2022 22:06:15 +0000 Subject: [PATCH] mgr/telemetry: add `perf_memory_metrics` collection to telemetry This new collection includes heap stats and mempool metrics for mon and mds daemons. A `tell_command` function was introduced to the mgr module as a wrapper around the `send_command` function to make it easier to run "tell" admin socket commands. Signed-off-by: Laura Flores --- src/pybind/mgr/mgr_module.py | 25 +++++ src/pybind/mgr/telemetry/module.py | 159 +++++++++++++++++++---------- 2 files changed, 132 insertions(+), 52 deletions(-) diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index a45282283217c..fc6b55ad0b9c3 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1640,6 +1640,31 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return r + def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]: + """ + Helper for `ceph tell` command execution. + + See send_command for general case. + + :param dict cmd_dict: expects a prefix i.e.: + cmd_dict = { + 'prefix': 'heap', + 'heapcmd': 'stats', + } + :return: status int, out std, err str + """ + t1 = time.time() + result = CommandResult() + self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf) + r = result.wait() + t2 = time.time() + + self.log.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format( + daemon_type, daemon_id, cmd_dict['prefix'], r[0], t2 - t1 + )) + + return r + def send_command( self, result: CommandResult, diff --git a/src/pybind/mgr/telemetry/module.py b/src/pybind/mgr/telemetry/module.py index ea97df47741ba..394dcd565130c 100644 --- a/src/pybind/mgr/telemetry/module.py +++ b/src/pybind/mgr/telemetry/module.py @@ -69,6 +69,7 @@ class Collection(str, enum.Enum): basic_pool_usage = 'basic_pool_usage' basic_usage_by_class = 'basic_usage_by_class' basic_rook_v01 = 'basic_rook_v01' + perf_memory_metrics = 'perf_memory_metrics' MODULE_COLLECTION : List[Dict] = [ { @@ -125,6 +126,12 @@ MODULE_COLLECTION : List[Dict] = [ "channel": "basic", "nag": True }, + { + "name": Collection.perf_memory_metrics, + "description": "Heap stats and mempools for mon and mds", + "channel": "perf", + "nag": False + }, ] ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [ @@ -467,66 +474,106 @@ class Module(MgrModule): return etype + '.' + m.hexdigest() def get_heap_stats(self) -> Dict[str, dict]: - # Initialize result dict - result: Dict[str, dict] = defaultdict(lambda: defaultdict(int)) - - # Get list of osd ids from the metadata - osd_metadata = self.get('osd_metadata') + result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + anonymized_daemons = {} + osd_map = self.get('osd_map') - # Grab output from the "osd.x heap stats" command - for osd_id in osd_metadata: - cmd_dict = { - 'prefix': 'heap', - 'heapcmd': 'stats', - 'id': str(osd_id), - } - r, outb, outs = self.osd_command(cmd_dict) - if r != 0: - self.log.debug("Invalid command dictionary.") - continue + # Combine available daemons + daemons = [] + for osd in osd_map['osds']: + daemons.append('osd'+'.'+str(osd['osd'])) + # perf_memory_metrics collection (1/2) + if self.is_enabled_collection(Collection.perf_memory_metrics): + mon_map = self.get('mon_map') + mds_metadata = self.get('mds_metadata') + for mon in mon_map['mons']: + daemons.append('mon'+'.'+mon['name']) + for mds in mds_metadata: + daemons.append('mds'+'.'+mds) + + # Grab output from the "daemon.x heap stats" command + for daemon in daemons: + daemon_type, daemon_id = daemon.split('.') + heap_stats = self.parse_heap_stats(daemon_type, daemon_id) + if heap_stats: + if (daemon_type != 'osd'): + # Anonymize mon and mds + anonymized_daemons[daemon] = self.anonymize_entity_name(daemon) + daemon = anonymized_daemons[daemon] + result[daemon_type][daemon] = heap_stats else: - if 'tcmalloc heap stats' in outs: - values = [int(i) for i in outs.split() if i.isdigit()] - # `categories` must be ordered this way for the correct output to be parsed - categories = ['use_by_application', - 'page_heap_freelist', - 'central_cache_freelist', - 'transfer_cache_freelist', - 'thread_cache_freelists', - 'malloc_metadata', - 'actual_memory_used', - 'released_to_os', - 'virtual_address_space_used', - 'spans_in_use', - 'thread_heaps_in_use', - 'tcmalloc_page_size'] - if len(values) != len(categories): - self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \ - 'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs)) - continue - osd = 'osd.' + str(osd_id) - result[osd] = dict(zip(categories, values)) - else: - self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs)) - continue + continue + if anonymized_daemons: + # for debugging purposes only, this data is never reported + self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons)) return result + def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]: + parsed_output = {} + + cmd_dict = { + 'prefix': 'heap', + 'heapcmd': 'stats' + } + r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict) + + if r != 0: + self.log.debug("Invalid command dictionary.") + else: + if 'tcmalloc heap stats' in outs: + values = [int(i) for i in outs.split() if i.isdigit()] + # `categories` must be ordered this way for the correct output to be parsed + categories = ['use_by_application', + 'page_heap_freelist', + 'central_cache_freelist', + 'transfer_cache_freelist', + 'thread_cache_freelists', + 'malloc_metadata', + 'actual_memory_used', + 'released_to_os', + 'virtual_address_space_used', + 'spans_in_use', + 'thread_heaps_in_use', + 'tcmalloc_page_size'] + if len(values) != len(categories): + self.log.debug('Received unexpected output from {}.{}; ' \ + 'number of values should match the number' \ + 'of expected categories:\n values: len={} {} '\ + '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs)) + else: + parsed_output = dict(zip(categories, values)) + else: + self.log.debug('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs)) + + return parsed_output + def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]: - # Initialize result dict - result: Dict[str, dict] = defaultdict(lambda: defaultdict(int)) + result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + anonymized_daemons = {} + osd_map = self.get('osd_map') - # Get list of osd ids from the metadata - osd_metadata = self.get('osd_metadata') + # Combine available daemons + daemons = [] + for osd in osd_map['osds']: + daemons.append('osd'+'.'+str(osd['osd'])) + # perf_memory_metrics collection (2/2) + if self.is_enabled_collection(Collection.perf_memory_metrics): + mon_map = self.get('mon_map') + mds_metadata = self.get('mds_metadata') + for mon in mon_map['mons']: + daemons.append('mon'+'.'+mon['name']) + for mds in mds_metadata: + daemons.append('mds'+'.'+mds) - # Grab output from the "osd.x dump_mempools" command - for osd_id in osd_metadata: + # Grab output from the "dump_mempools" command + for daemon in daemons: + daemon_type, daemon_id = daemon.split('.') cmd_dict = { 'prefix': 'dump_mempools', - 'id': str(osd_id), 'format': 'json' } - r, outb, outs = self.osd_command(cmd_dict) + r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict) if r != 0: self.log.debug("Invalid command dictionary.") continue @@ -535,17 +582,25 @@ class Module(MgrModule): # This is where the mempool will land. dump = json.loads(outb) if mode == 'separated': - result["osd." + str(osd_id)] = dump['mempool']['by_pool'] + # Anonymize mon and mds + if daemon_type != 'osd': + anonymized_daemons[daemon] = self.anonymize_entity_name(daemon) + daemon = anonymized_daemons[daemon] + result[daemon_type][daemon] = dump['mempool']['by_pool'] elif mode == 'aggregated': for mem_type in dump['mempool']['by_pool']: - result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes'] - result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items'] + result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes'] + result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items'] else: self.log.debug("Incorrect mode specified in get_mempool") except (json.decoder.JSONDecodeError, KeyError) as e: - self.log.debug("Error caught on osd.{}: {}".format(osd_id, e)) + self.log.debug("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e)) continue + if anonymized_daemons: + # for debugging purposes only, this data is never reported + self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons)) + return result def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]: -- 2.39.5