]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/telemetry: add `perf_memory_metrics` collection to telemetry
authorLaura Flores <lflores@redhat.com>
Tue, 5 Jul 2022 22:06:15 +0000 (22:06 +0000)
committerLaura Flores <lflores@redhat.com>
Fri, 26 Aug 2022 22:32:12 +0000 (17:32 -0500)
This new collection includes heap stats and mempool metrics for
mon and mds daemons.

A `tell_command` function was introduced to the mgr module as a wrapper
around the `send_command` function to make it easier to run "tell"
admin socket commands.

Signed-off-by: Laura Flores <lflores@redhat.com>
(cherry picked from commit 1cd4edb6a675e816fc8a555fab141715aee6d070)

src/pybind/mgr/mgr_module.py
src/pybind/mgr/telemetry/module.py

index 9110227efcc9d14b3b0f956c9455edb583a6c5ce..1730cdc226fca262f41ecf3106270c5156e0bb5b 100644 (file)
@@ -1638,6 +1638,31 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return r
 
+    def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+        """
+        Helper for `ceph tell` command execution.
+
+        See send_command for general case.
+
+        :param dict cmd_dict: expects a prefix i.e.:
+            cmd_dict = {
+                'prefix': 'heap',
+                'heapcmd': 'stats',
+            }
+        :return: status int, out std, err str
+        """
+        t1 = time.time()
+        result = CommandResult()
+        self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf)
+        r = result.wait()
+        t2 = time.time()
+
+        self.log.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
+            daemon_type, daemon_id, cmd_dict['prefix'], r[0], t2 - t1
+        ))
+
+        return r
+
     def send_command(
             self,
             result: CommandResult,
index bff9b843ee71b81bf8ea1cf88a72e69a63fa0147..75ab389905088b89055cffbcfbe2cff72fea7383 100644 (file)
@@ -69,6 +69,7 @@ class Collection(str, enum.Enum):
     basic_pool_usage = 'basic_pool_usage'
     basic_usage_by_class = 'basic_usage_by_class'
     basic_rook_v01 = 'basic_rook_v01'
+    perf_memory_metrics = 'perf_memory_metrics'
 
 MODULE_COLLECTION : List[Dict] = [
     {
@@ -125,6 +126,12 @@ MODULE_COLLECTION : List[Dict] = [
         "channel": "basic",
         "nag": True
     },
+    {
+        "name": Collection.perf_memory_metrics,
+        "description": "Heap stats and mempools for mon and mds",
+        "channel": "perf",
+        "nag": False
+    },
 ]
 
 ROOK_KEYS_BY_COLLECTION : List[Tuple[str, Collection]] = [
@@ -467,66 +474,106 @@ class Module(MgrModule):
         return  etype + '.' + m.hexdigest()
 
     def get_heap_stats(self) -> Dict[str, dict]:
-        # Initialize result dict
-        result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
-
-        # Get list of osd ids from the metadata
-        osd_metadata = self.get('osd_metadata')
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
 
-        # Grab output from the "osd.x heap stats" command
-        for osd_id in osd_metadata:
-            cmd_dict = {
-                'prefix': 'heap',
-                'heapcmd': 'stats',
-                'id': str(osd_id),
-            }
-            r, outb, outs = self.osd_command(cmd_dict)
-            if r != 0:
-                self.log.debug("Invalid command dictionary.")
-                continue
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (1/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
+
+        # Grab output from the "daemon.x heap stats" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.')
+            heap_stats = self.parse_heap_stats(daemon_type, daemon_id)
+            if heap_stats:
+                if (daemon_type != 'osd'):
+                    # Anonymize mon and mds
+                    anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                    daemon = anonymized_daemons[daemon]
+                result[daemon_type][daemon] = heap_stats
             else:
-                if 'tcmalloc heap stats' in outs:
-                    values = [int(i) for i in outs.split() if i.isdigit()]
-                    # `categories` must be ordered this way for the correct output to be parsed
-                    categories = ['use_by_application',
-                                  'page_heap_freelist',
-                                  'central_cache_freelist',
-                                  'transfer_cache_freelist',
-                                  'thread_cache_freelists',
-                                  'malloc_metadata',
-                                  'actual_memory_used',
-                                  'released_to_os',
-                                  'virtual_address_space_used',
-                                  'spans_in_use',
-                                  'thread_heaps_in_use',
-                                  'tcmalloc_page_size']
-                    if len(values) != len(categories):
-                        self.log.debug('Received unexpected output from osd.{}; number of values should match the number of expected categories:\n' \
-                                'values: len={} {} ~ categories: len={} {} ~ outs: {}'.format(osd_id, len(values), values, len(categories), categories, outs))
-                        continue
-                    osd = 'osd.' + str(osd_id)
-                    result[osd] = dict(zip(categories, values))
-                else:
-                    self.log.debug('No heap stats available on osd.{}: {}'.format(osd_id, outs))
-                    continue
+                continue
 
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry heap_stats (anonymized: real): {}'.format(anonymized_daemons))
         return result
 
+    def parse_heap_stats(self, daemon_type: str, daemon_id: Any) -> Dict[str, int]:
+        parsed_output = {}
+
+        cmd_dict = {
+            'prefix': 'heap',
+            'heapcmd': 'stats'
+        }
+        r, outb, outs = self.tell_command(daemon_type, str(daemon_id), cmd_dict)
+
+        if r != 0:
+            self.log.debug("Invalid command dictionary.")
+        else:
+            if 'tcmalloc heap stats' in outs:
+                values = [int(i) for i in outs.split() if i.isdigit()]
+                # `categories` must be ordered this way for the correct output to be parsed
+                categories = ['use_by_application',
+                              'page_heap_freelist',
+                              'central_cache_freelist',
+                              'transfer_cache_freelist',
+                              'thread_cache_freelists',
+                              'malloc_metadata',
+                              'actual_memory_used',
+                              'released_to_os',
+                              'virtual_address_space_used',
+                              'spans_in_use',
+                              'thread_heaps_in_use',
+                              'tcmalloc_page_size']
+                if len(values) != len(categories):
+                    self.log.debug('Received unexpected output from {}.{}; ' \
+                                   'number of values should match the number' \
+                                   'of expected categories:\n values: len={} {} '\
+                                   '~ categories: len={} {} ~ outs: {}'.format(daemon_type, daemon_id, len(values), values, len(categories), categories, outs))
+                else:
+                    parsed_output = dict(zip(categories, values))
+            else:
+                self.log.debug('No heap stats available on {}.{}: {}'.format(daemon_type, daemon_id, outs))
+        
+        return parsed_output
+
     def get_mempool(self, mode: str = 'separated') -> Dict[str, dict]:
-        # Initialize result dict
-        result: Dict[str, dict] = defaultdict(lambda: defaultdict(int))
+        result: Dict[str, dict] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
+        anonymized_daemons = {}
+        osd_map = self.get('osd_map')
 
-        # Get list of osd ids from the metadata
-        osd_metadata = self.get('osd_metadata')
+        # Combine available daemons
+        daemons = []
+        for osd in osd_map['osds']:
+            daemons.append('osd'+'.'+str(osd['osd']))
+        # perf_memory_metrics collection (2/2)
+        if self.is_enabled_collection(Collection.perf_memory_metrics):
+            mon_map = self.get('mon_map')
+            mds_metadata = self.get('mds_metadata')
+            for mon in mon_map['mons']:
+                daemons.append('mon'+'.'+mon['name'])
+            for mds in mds_metadata:
+                daemons.append('mds'+'.'+mds)
 
-        # Grab output from the "osd.x dump_mempools" command
-        for osd_id in osd_metadata:
+        # Grab output from the "dump_mempools" command
+        for daemon in daemons:
+            daemon_type, daemon_id = daemon.split('.')
             cmd_dict = {
                 'prefix': 'dump_mempools',
-                'id': str(osd_id),
                 'format': 'json'
             }
-            r, outb, outs = self.osd_command(cmd_dict)
+            r, outb, outs = self.tell_command(daemon_type, daemon_id, cmd_dict)
             if r != 0:
                 self.log.debug("Invalid command dictionary.")
                 continue
@@ -535,17 +582,25 @@ class Module(MgrModule):
                     # This is where the mempool will land.
                     dump = json.loads(outb)
                     if mode == 'separated':
-                        result["osd." + str(osd_id)] = dump['mempool']['by_pool']
+                        # Anonymize mon and mds
+                        if daemon_type != 'osd':
+                            anonymized_daemons[daemon] = self.anonymize_entity_name(daemon)
+                            daemon = anonymized_daemons[daemon]
+                        result[daemon_type][daemon] = dump['mempool']['by_pool']
                     elif mode == 'aggregated':
                         for mem_type in dump['mempool']['by_pool']:
-                            result[mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
-                            result[mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
+                            result[daemon_type][mem_type]['bytes'] += dump['mempool']['by_pool'][mem_type]['bytes']
+                            result[daemon_type][mem_type]['items'] += dump['mempool']['by_pool'][mem_type]['items']
                     else:
                         self.log.debug("Incorrect mode specified in get_mempool")
                 except (json.decoder.JSONDecodeError, KeyError) as e:
-                    self.log.debug("Error caught on osd.{}: {}".format(osd_id, e))
+                    self.log.debug("Error caught on {}.{}: {}".format(daemon_type, daemon_id, e))
                     continue
 
+        if anonymized_daemons:
+            # for debugging purposes only, this data is never reported
+            self.log.debug('Anonymized daemon mapping for telemetry mempool (anonymized: real): {}'.format(anonymized_daemons))
+
         return result
 
     def get_osd_histograms(self, mode: str = 'separated') -> List[Dict[str, dict]]: