]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
prometheus: Fix metric resets
authorBoris Ranto <branto@redhat.com>
Wed, 27 Jun 2018 07:50:04 +0000 (09:50 +0200)
committerBoris Ranto <branto@redhat.com>
Tue, 17 Jul 2018 20:28:59 +0000 (22:28 +0200)
This patch changes the way we reset metrics when collecting data in the
prometheus exporter module. With this patch, the exported metrics at any
point in time should align with a freshly started ceph-mgr module.

The patch also introduces locking mechanism to serialize the requests
and to avoid overwriting the metrics during multiple scrapes happenning
at the same time.

Signed-off-by: Boris Ranto <branto@redhat.com>
src/pybind/mgr/prometheus/module.py

index 0d5d4febed88f28ae7cd82b4cb4acace288e0fd3..3b556b9f3f3932d8a75462cfa1e3a4c7c9701bbe 100644 (file)
@@ -5,7 +5,7 @@ import math
 import os
 import socket
 import threading
-from collections import OrderedDict
+import time
 from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
 
 # Defaults for the Prometheus HTTP server.  Can also set in config-key
@@ -78,40 +78,30 @@ NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
 class Metrics(object):
     def __init__(self):
         self.metrics = self._setup_static_metrics()
-        self.pending = {}
 
-    def set(self, key, value, labels=('',)):
+    def clear(self):
         '''
-        Set the value of a single Metrics. This should be used for static metrics,
-        e.g. cluster health.
+        Clear all the metrics data. This does not remove the initiated Metric classes.
         '''
-        self.metrics[key].set(value, labels)
+        for k in self.metrics.keys():
+            self.metrics[k].clear()
 
-    def append(self, key, value, labels = ('',)):
+    def set(self, key, value, labels=('',)):
         '''
-        Append a metrics to the staging area. Use this to aggregate daemon specific
-        metrics that can appear and go away as daemons are added or removed.
+        Set the value of a single Metric (with labels). Use this to set the value of any metric.
         '''
-        if key not in self.pending:
-            self.pending[key] = []
-        self.pending[key].append((labels, value))
+        self.metrics[key].set(value, labels)
 
-    def reset(self):
+    def all(self):
         '''
-        When metrics aggregation is done, call Metrics.reset() to apply the
-        aggregated metric. This will remove all label -> value mappings for a
-        metric and set the new mapping (from pending). This means daemon specific
-        metrics os daemons that do no longer exist, are removed.
+        Return the dict of all the metrics.
         '''
-        for k, v in self.pending.items():
-            self.metrics[k].reset(v)
-        self.pending = {}
+        return self.metrics
 
     def add_metric(self, path, metric):
         if path not in self.metrics:
             self.metrics[path] = metric
 
-
     def _setup_static_metrics(self):
         metrics = {}
         metrics['health_status'] = Metric(
@@ -236,7 +226,6 @@ class Metrics(object):
         return metrics
 
 
-
 class Metric(object):
     def __init__(self, mtype, name, desc, labels=None):
         self.mtype = mtype
@@ -245,16 +234,14 @@ class Metric(object):
         self.labelnames = labels    # tuple if present
         self.value = {}             # indexed by label values
 
+    def clear(self):
+        self.value = {}
+
     def set(self, value, labelvalues=None):
         # labelvalues must be a tuple
         labelvalues = labelvalues or ('',)
         self.value[labelvalues] = value
 
-    def reset(self, values):
-        self.value = {}
-        for labelvalues, value in values:
-            self.value[labelvalues] = value
-
     def str_expfmt(self):
 
         def promethize(path):
@@ -329,8 +316,11 @@ class Module(MgrModule):
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
         self.metrics = Metrics()
-        self.schema = OrderedDict()
         self.shutdown_event = threading.Event()
+        self.collect_lock = threading.RLock()
+        self.collect_time = 0
+        self.collect_timeout = 5.0
+        self.collect_cache = None
         _global_instance['plugin'] = self
 
     def get_health(self):
@@ -347,7 +337,7 @@ class Module(MgrModule):
 
         for pool in df['pools']:
             for stat in DF_POOL:
-                self.metrics.append('pool_{}'.format(stat),
+                self.metrics.set('pool_{}'.format(stat),
                                     pool['stats'][stat],
                                     (pool['id'],))
 
@@ -358,7 +348,7 @@ class Module(MgrModule):
         for fs in fs_map['filesystems']:
             # collect fs metadata
             data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
-            self.metrics.append('fs_metadata', 1,
+            self.metrics.set('fs_metadata', 1,
                                 (data_pools,
                                  fs['id'],
                                  fs['mdsmap']['metadata_pool'],
@@ -367,7 +357,7 @@ class Module(MgrModule):
             for gid, daemon in fs['mdsmap']['info'].items():
                 id_ = daemon['name']
                 host_version = servers.get((id_, 'mds'), ('',''))
-                self.metrics.append('mds_metadata', 1,
+                self.metrics.set('mds_metadata', 1,
                                     ('mds.{}'.format(id_), fs['id'],
                                      host_version[0], daemon['addr'],
                                      daemon['rank'], host_version[1]))
@@ -379,12 +369,12 @@ class Module(MgrModule):
             rank = mon['rank']
             id_ = mon['name']
             host_version = servers.get((id_, 'mon'), ('',''))
-            self.metrics.append('mon_metadata', 1,
+            self.metrics.set('mon_metadata', 1,
                                 ('mon.{}'.format(id_), host_version[0],
                                  mon['public_addr'].split(':')[0], rank,
                                  host_version[1]))
             in_quorum = int(rank in mon_status['quorum'])
-            self.metrics.append('mon_quorum_status', in_quorum,
+            self.metrics.set('mon_quorum_status', in_quorum,
                                 ('mon.{}'.format(id_),))
 
     def get_pg_status(self):
@@ -419,7 +409,7 @@ class Module(MgrModule):
             id_ = osd['osd']
             for stat in OSD_STATS:
                 val = osd['perf_stat'][stat]
-                self.metrics.append('osd_{}'.format(stat), val,
+                self.metrics.set('osd_{}'.format(stat), val,
                                     ('osd.{}'.format(id_),))
 
     def get_service_list(self):
@@ -467,7 +457,7 @@ class Module(MgrModule):
 
             host_version = servers.get((str(id_), 'osd'), ('',''))
 
-            self.metrics.append('osd_metadata', 1, (
+            self.metrics.set('osd_metadata', 1, (
                 'osd.{}'.format(id_),
                 c_addr,
                 dev_class,
@@ -478,7 +468,7 @@ class Module(MgrModule):
             # collect osd status
             for state in OSD_STATUS:
                 status = osd[state]
-                self.metrics.append('osd_{}'.format(state), status,
+                self.metrics.set('osd_{}'.format(state), status,
                                     ('osd.{}'.format(id_),))
 
             # collect disk occupation metadata
@@ -507,7 +497,7 @@ class Module(MgrModule):
 
         pool_meta = []
         for pool in osd_map['pools']:
-            self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
+            self.metrics.set('pool_metadata', 1, (pool['pool'], pool['pool_name']))
 
         # Populate rgw_metadata
         for key, value in servers.items():
@@ -515,7 +505,7 @@ class Module(MgrModule):
             if service_type != 'rgw':
                 continue
             hostname, version = value
-            self.metrics.append(
+            self.metrics.set(
                 'rgw_metadata',
                 1,
                 ('{}.{}'.format(service_type, service_id), hostname, version)
@@ -528,6 +518,9 @@ class Module(MgrModule):
             self.metrics.set(stat, pg_sum[stat])
 
     def collect(self):
+        # Clear the metrics before scraping
+        self.metrics.clear()
+
         self.get_health()
         self.get_df()
         self.get_fs()
@@ -557,7 +550,7 @@ class Module(MgrModule):
                         counter_info['description'] + ' Total',
                         ("ceph_daemon",),
                     ))
-                    self.metrics.append(_path, value, (daemon,))
+                    self.metrics.set(_path, value, (daemon,))
 
                     _path = path + '_count'
                     self.metrics.add_metric(_path, Metric(
@@ -566,7 +559,7 @@ class Module(MgrModule):
                         counter_info['description'] + ' Count',
                         ("ceph_daemon",),
                     ))
-                    self.metrics.append(_path, counter_info['count'], (daemon,))
+                    self.metrics.set(_path, counter_info['count'], (daemon,))
                 else:
                     self.metrics.add_metric(path, Metric(
                         stattype,
@@ -574,12 +567,10 @@ class Module(MgrModule):
                         counter_info['description'],
                         ("ceph_daemon",),
                     ))
-                    self.metrics.append(path, value, (daemon,))
+                    self.metrics.set(path, value, (daemon,))
 
-        # It is sufficient to reset the pending metrics once per scrape
-        self.metrics.reset()
 
-        return self.metrics.metrics
+        return self.metrics.all()
 
     def get_file_sd_config(self):
         servers = self.list_servers()
@@ -656,11 +647,25 @@ class Module(MgrModule):
 
             @cherrypy.expose
             def metrics(self):
-                if global_instance().have_mon_connection():
-                    metrics = global_instance().collect()
+                inst = global_instance()
+                # Lock the function execution
+                try:
+                    inst.collect_lock.acquire()
+                    return self._metrics(inst)
+                finally:
+                    inst.collect_lock.release()
+
+            def _metrics(self, inst):
+                # Return cached data if available and collected before the cache times out
+                if inst.collect_cache and time.time() - inst.collect_time  < inst.collect_timeout:
+                    return inst.collect_cache
+
+                if inst.have_mon_connection():
+                    metrics = inst.collect()
                     cherrypy.response.headers['Content-Type'] = 'text/plain'
                     if metrics:
-                        return self.format_metrics(metrics)
+                        inst.collect_cache = self.format_metrics(metrics)
+                        return inst.collect_cache
                 else:
                     raise cherrypy.HTTPError(503, 'No MON connection')