]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/prometheus: add Metrics class to manage Metric instances
authorJan Fajerski <jfajerski@suse.com>
Sat, 17 Feb 2018 11:07:46 +0000 (12:07 +0100)
committerJan Fajerski <jfajerski@suse.com>
Thu, 1 Mar 2018 07:43:09 +0000 (08:43 +0100)
The central change of this commit is that per-daemon metrics are now
managed by first appending the metric (using Metrics.append) to a
staging area. Then the metrics for specific paths (metric names) are
overwritten by the staged metrics (by calling Metrics.reset). This gets
rid of metrics from daemon that are no longer in the cluster. I.e. when
ceph no longer reports metrics for one OSD daemon (because it was
removed from the cluster) the prometheus module will no longer export
metrics for that daemon.

Signed-off-by: Jan Fajerski <jfajerski@suse.com>
src/pybind/mgr/prometheus/module.py

index d7f4fb2df02dce9925a4e09465bb1e1d36dec15d..0b0df46a6e655e9cbf8079bc38a7d88657d7f05c 100644 (file)
@@ -67,85 +67,42 @@ POOL_METADATA = ('pool_id', 'name')
 DISK_OCCUPATION = ('instance', 'device', 'ceph_daemon')
 
 
-class Metric(object):
-    def __init__(self, mtype, name, desc, labels=None):
-        self.mtype = mtype
-        self.name = name
-        self.desc = desc
-        self.labelnames = labels    # tuple if present
-        self.value = dict()         # indexed by label values
-
-    def set(self, value, labelvalues=None):
-        # labelvalues must be a tuple
-        labelvalues = labelvalues or ('',)
-        self.value[labelvalues] = value
-
-    def str_expfmt(self):
-
-        def promethize(path):
-            ''' replace illegal metric name characters '''
-            result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
-
-            # Hyphens usually turn into underscores, unless they are
-            # trailing
-            if result.endswith("-"):
-                result = result[0:-1] + "_minus"
-            else:
-                result = result.replace("-", "_")
-
-            return "ceph_{0}".format(result)
-
-        def floatstr(value):
-            ''' represent as Go-compatible float '''
-            if value == float('inf'):
-                return '+Inf'
-            if value == float('-inf'):
-                return '-Inf'
-            if math.isnan(value):
-                return 'NaN'
-            return repr(float(value))
-
-        name = promethize(self.name)
-        expfmt = '''
-# HELP {name} {desc}
-# TYPE {name} {mtype}'''.format(
-            name=name,
-            desc=self.desc,
-            mtype=self.mtype,
-        )
-
-        for labelvalues, value in self.value.items():
-            if self.labelnames:
-                labels = zip(self.labelnames, labelvalues)
-                labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
-            else:
-                labels = ''
-            if labels:
-                fmtstr = '\n{name}{{{labels}}} {value}'
-            else:
-                fmtstr = '\n{name} {value}'
-            expfmt += fmtstr.format(
-                name=name,
-                labels=labels,
-                value=floatstr(value),
-            )
-        return expfmt
-
-
-class Module(MgrModule):
-    COMMANDS = [
-        {
-            "cmd": "prometheus self-test",
-            "desc": "Run a self test on the prometheus module",
-            "perm": "rw"
-        },
-    ]
-
-    def __init__(self, *args, **kwargs):
-        super(Module, self).__init__(*args, **kwargs)
+class Metrics(object):
+    def __init__(self):
         self.metrics = self._setup_static_metrics()
-        self.schema = OrderedDict()
-        _global_instance['plugin'] = self
+        self.pending = {}
+
+    def set(self, key, value, labels=('',)):
+        '''
+        Set the value of a single Metrics. This should be used for static metrics,
+        e.g. cluster health.
+        '''
+        self.metrics[key].set(value, labels)
+
+    def append(self, key, value, labels = ('',)):
+        '''
+        Append a metrics to the staging area. Use this to aggregate daemon specific
+        metrics that can appear and go away as daemons are added or removed.
+        '''
+        if key not in self.pending:
+            self.pending[key] = []
+        self.pending[key].append((labels, value))
+
+    def reset(self):
+        '''
+        When metrics aggregation is done, call Metrics.reset() to apply the
+        aggregated metric. This will remove all label -> value mappings for a
+        metric and set the new mapping (from pending). This means daemon specific
+        metrics os daemons that do no longer exist, are removed.
+        '''
+        for k, v in self.pending.items():
+            self.metrics[k].reset(v)
+        self.pending = {}
+
+    def add_metric(self, path, metric):
+        if path not in self.metrics:
+            self.metrics[path] = metric
+
 
     def _setup_static_metrics(self):
         metrics = {}
@@ -198,7 +155,6 @@ class Module(MgrModule):
             )
         for state in OSD_STATUS:
             path = 'osd_{}'.format(state)
-            self.log.debug("init: creating {}".format(path))
             metrics[path] = Metric(
                 'untyped',
                 path,
@@ -207,7 +163,6 @@ class Module(MgrModule):
             )
         for stat in OSD_STATS:
             path = 'osd_{}'.format(stat)
-            self.log.debug("init: creating {}".format(path))
             metrics[path] = Metric(
                 'gauge',
                 path,
@@ -216,7 +171,6 @@ class Module(MgrModule):
             )
         for state in PG_STATES:
             path = 'pg_{}'.format(state)
-            self.log.debug("init: creating {}".format(path))
             metrics[path] = Metric(
                 'gauge',
                 path,
@@ -224,7 +178,6 @@ class Module(MgrModule):
             )
         for state in DF_CLUSTER:
             path = 'cluster_{}'.format(state)
-            self.log.debug("init: creating {}".format(path))
             metrics[path] = Metric(
                 'gauge',
                 path,
@@ -232,7 +185,6 @@ class Module(MgrModule):
             )
         for state in DF_POOL:
             path = 'pool_{}'.format(state)
-            self.log.debug("init: creating {}".format(path))
             metrics[path] = Metric(
                 'gauge',
                 path,
@@ -242,36 +194,122 @@ class Module(MgrModule):
 
         return metrics
 
+
+
+class Metric(object):
+    def __init__(self, mtype, name, desc, labels=None):
+        self.mtype = mtype
+        self.name = name
+        self.desc = desc
+        self.labelnames = labels    # tuple if present
+        self.value = {}             # indexed by label values
+
+    def set(self, value, labelvalues=None):
+        # labelvalues must be a tuple
+        labelvalues = labelvalues or ('',)
+        self.value[labelvalues] = value
+
+    def reset(self, values):
+        self.value = {}
+        for labelvalues, value in values:
+            self.value[labelvalues] = value
+
+    def str_expfmt(self):
+
+        def promethize(path):
+            ''' replace illegal metric name characters '''
+            result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
+
+            # Hyphens usually turn into underscores, unless they are
+            # trailing
+            if result.endswith("-"):
+                result = result[0:-1] + "_minus"
+            else:
+                result = result.replace("-", "_")
+
+            return "ceph_{0}".format(result)
+
+        def floatstr(value):
+            ''' represent as Go-compatible float '''
+            if value == float('inf'):
+                return '+Inf'
+            if value == float('-inf'):
+                return '-Inf'
+            if math.isnan(value):
+                return 'NaN'
+            return repr(float(value))
+
+        name = promethize(self.name)
+        expfmt = '''
+# HELP {name} {desc}
+# TYPE {name} {mtype}'''.format(
+            name=name,
+            desc=self.desc,
+            mtype=self.mtype,
+        )
+
+        for labelvalues, value in self.value.items():
+            if self.labelnames:
+                labels = zip(self.labelnames, labelvalues)
+                labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
+            else:
+                labels = ''
+            if labels:
+                fmtstr = '\n{name}{{{labels}}} {value}'
+            else:
+                fmtstr = '\n{name} {value}'
+            expfmt += fmtstr.format(
+                name=name,
+                labels=labels,
+                value=floatstr(value),
+            )
+        return expfmt
+
+
+class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "prometheus self-test",
+            "desc": "Run a self test on the prometheus module",
+            "perm": "rw"
+        },
+    ]
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        self.metrics = Metrics()
+        self.schema = OrderedDict()
+        _global_instance['plugin'] = self
+
     def get_health(self):
         health = json.loads(self.get('health')['json'])
-        self.metrics['health_status'].set(
-            health_status_to_number(health['status'])
+        self.metrics.set('health_status',
+                         health_status_to_number(health['status'])
         )
 
     def get_df(self):
         # maybe get the to-be-exported metrics from a config?
         df = self.get('df')
         for stat in DF_CLUSTER:
-            path = 'cluster_{}'.format(stat)
-            self.metrics[path].set(df['stats'][stat])
+            self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
 
         for pool in df['pools']:
             for stat in DF_POOL:
-                path = 'pool_{}'.format(stat)
-                self.metrics[path].set(pool['stats'][stat], (pool['id'],))
+                self.metrics.append('pool_{}'.format(stat),
+                                    pool['stats'][stat],
+                                    (pool['id'],))
 
     def get_quorum_status(self):
         mon_status = json.loads(self.get('mon_status')['json'])
-        self.metrics['mon_quorum_count'].set(len(mon_status['quorum']))
+        self.metrics.set('mon_quorum_count', len(mon_status['quorum']))
+        # maybe rather us mon_in_quorum{id=$rank} [0,1]?
 
     def get_pg_status(self):
         # TODO add per pool status?
         pg_status = self.get('pg_status')
 
         # Set total count of PGs, first
-        self.metrics['pg_total'].set(
-            pg_status['num_pgs'],
-        )
+        self.metrics.set('pg_total', pg_status['num_pgs'])
 
         reported_states = {}
         for pg in pg_status['pgs_by_state']:
@@ -281,15 +319,14 @@ class Module(MgrModule):
         for state in reported_states:
             path = 'pg_{}'.format(state)
             try:
-                self.metrics[path].set(reported_states[state])
+                self.metrics.set(path, reported_states[state])
             except KeyError:
                 self.log.warn("skipping pg in unknown state {}".format(state))
 
         for state in PG_STATES:
-            path = 'pg_{}'.format(state)
             if state not in reported_states:
                 try:
-                    self.metrics[path].set(0)
+                    self.metrics.set('pg_{}'.format(state), 0)
                 except KeyError:
                     self.log.warn("skipping pg in unknown state {}".format(state))
 
@@ -298,10 +335,9 @@ class Module(MgrModule):
         for osd in osd_stats['osd_stats']:
             id_ = osd['osd']
             for stat in OSD_STATS:
-                status = osd['perf_stat'][stat]
-                self.metrics['osd_{}'.format(stat)].set(
-                    status,
-                    ('osd.{}'.format(id_),))
+                val = osd['perf_stat'][stat]
+                self.metrics.append('osd_{}'.format(stat), val,
+                                    ('osd.{}'.format(id_),))
 
     def get_metadata_and_osd_status(self):
         osd_map = self.get('osd_map')
@@ -312,7 +348,9 @@ class Module(MgrModule):
             )
         osd_devices = self.get('osd_map_crush')['devices']
         for osd in osd_map['osds']:
+            # id can be used to link osd metrics and metadata
             id_ = osd['osd']
+            # collect osd metadata
             p_addr = osd['public_addr'].split(':')[0]
             c_addr = osd['cluster_addr'].split(':')[0]
             if p_addr == "-" or c_addr == "-":
@@ -322,18 +360,15 @@ class Module(MgrModule):
                 )
                 continue
             dev_class = next((osd for osd in osd_devices if osd['id'] == id_))
-            self.metrics['osd_metadata'].set(1, (
-                c_addr,
-                dev_class['class'],
-                id_,
-                p_addr
-            ))
+            self.metrics.append('osd_metadata', 1, (c_addr, dev_class.get('class',''), id_, p_addr))
+
+            # collect osd status
             for state in OSD_STATUS:
                 status = osd[state]
-                self.metrics['osd_{}'.format(state)].set(
-                    status,
-                    ('osd.{}'.format(id_),))
+                self.metrics.append('osd_{}'.format(state), status,
+                                    ('osd.{}'.format(id_),))
 
+            # collect disk occupation metadata
             osd_metadata = self.get_metadata("osd", str(id_))
             if osd_metadata is None:
                 continue
@@ -348,7 +383,7 @@ class Module(MgrModule):
             if osd_dev_node and osd_hostname:
                 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
                     id_, osd_hostname, osd_dev_node))
-                self.metrics['disk_occupation'].set(1, (
+                self.metrics.set('disk_occupation', 1, (
                     osd_hostname,
                     osd_dev_node,
                     "osd.{0}".format(id_)
@@ -357,10 +392,9 @@ class Module(MgrModule):
                 self.log.info("Missing dev node metadata for osd {0}, skipping "
                                "occupation record for this osd".format(id_))
 
+        pool_meta = []
         for pool in osd_map['pools']:
-            id_ = pool['pool']
-            name = pool['pool_name']
-            self.metrics['pool_metadata'].set(1, (id_, name))
+            self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
 
     def collect(self):
         self.get_health()
@@ -370,7 +404,7 @@ class Module(MgrModule):
         self.get_metadata_and_osd_status()
         self.get_pg_status()
 
-        for daemon, counters in self.get_all_perf_counters().iteritems():
+        for daemon, counters in self.get_all_perf_counters().items():
             for path, counter_info in counters.items():
                 stattype = self._stattype_to_str(counter_info['type'])
                 # XXX simplify first effort: no histograms
@@ -379,20 +413,18 @@ class Module(MgrModule):
                     self.log.debug('ignoring %s, type %s' % (path, stattype))
                     continue
 
-                if path not in self.metrics:
-                    self.metrics[path] = Metric(
+                self.metrics.add_metric(path, Metric(
                         stattype,
                         path,
                         counter_info['description'],
                         ("ceph_daemon",),
-                    )
+                    ))
 
-                self.metrics[path].set(
-                    counter_info['value'],
-                    (daemon,)
-                )
+                self.metrics.append(path, counter_info['value'], (daemon,))
+        # It is sufficient to reset the pending metrics once per scrape
+        self.metrics.reset()
 
-        return self.metrics
+        return self.metrics.metrics
 
     def handle_command(self, cmd):
         if cmd['prefix'] == 'prometheus self-test':