From f6c4db8c11471c8c11e387d37833c2b49872b725 Mon Sep 17 00:00:00 2001 From: Boris Ranto Date: Wed, 27 Jun 2018 09:50:04 +0200 Subject: [PATCH] prometheus: Fix metric resets This patch changes the way we reset metrics when collecting data in the prometheus exporter module. With this patch, the exported metrics at any point in time should align with a freshly started ceph-mgr module. The patch also introduces locking mechanism to serialize the requests and to avoid overwriting the metrics during multiple scrapes happenning at the same time. Signed-off-by: Boris Ranto --- src/pybind/mgr/prometheus/module.py | 99 +++++++++++++++-------------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/src/pybind/mgr/prometheus/module.py b/src/pybind/mgr/prometheus/module.py index 0d5d4febed88f..3b556b9f3f393 100644 --- a/src/pybind/mgr/prometheus/module.py +++ b/src/pybind/mgr/prometheus/module.py @@ -5,7 +5,7 @@ import math import os import socket import threading -from collections import OrderedDict +import time from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES # Defaults for the Prometheus HTTP server. Can also set in config-key @@ -78,40 +78,30 @@ NUM_OBJECTS = ['degraded', 'misplaced', 'unfound'] class Metrics(object): def __init__(self): self.metrics = self._setup_static_metrics() - self.pending = {} - def set(self, key, value, labels=('',)): + def clear(self): ''' - Set the value of a single Metrics. This should be used for static metrics, - e.g. cluster health. + Clear all the metrics data. This does not remove the initiated Metric classes. ''' - self.metrics[key].set(value, labels) + for k in self.metrics.keys(): + self.metrics[k].clear() - def append(self, key, value, labels = ('',)): + def set(self, key, value, labels=('',)): ''' - Append a metrics to the staging area. Use this to aggregate daemon specific - metrics that can appear and go away as daemons are added or removed. + Set the value of a single Metric (with labels). Use this to set the value of any metric. ''' - if key not in self.pending: - self.pending[key] = [] - self.pending[key].append((labels, value)) + self.metrics[key].set(value, labels) - def reset(self): + def all(self): ''' - When metrics aggregation is done, call Metrics.reset() to apply the - aggregated metric. This will remove all label -> value mappings for a - metric and set the new mapping (from pending). This means daemon specific - metrics os daemons that do no longer exist, are removed. + Return the dict of all the metrics. ''' - for k, v in self.pending.items(): - self.metrics[k].reset(v) - self.pending = {} + return self.metrics def add_metric(self, path, metric): if path not in self.metrics: self.metrics[path] = metric - def _setup_static_metrics(self): metrics = {} metrics['health_status'] = Metric( @@ -236,7 +226,6 @@ class Metrics(object): return metrics - class Metric(object): def __init__(self, mtype, name, desc, labels=None): self.mtype = mtype @@ -245,16 +234,14 @@ class Metric(object): self.labelnames = labels # tuple if present self.value = {} # indexed by label values + def clear(self): + self.value = {} + def set(self, value, labelvalues=None): # labelvalues must be a tuple labelvalues = labelvalues or ('',) self.value[labelvalues] = value - def reset(self, values): - self.value = {} - for labelvalues, value in values: - self.value[labelvalues] = value - def str_expfmt(self): def promethize(path): @@ -329,8 +316,11 @@ class Module(MgrModule): def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) self.metrics = Metrics() - self.schema = OrderedDict() self.shutdown_event = threading.Event() + self.collect_lock = threading.RLock() + self.collect_time = 0 + self.collect_timeout = 5.0 + self.collect_cache = None _global_instance['plugin'] = self def get_health(self): @@ -347,7 +337,7 @@ class Module(MgrModule): for pool in df['pools']: for stat in DF_POOL: - self.metrics.append('pool_{}'.format(stat), + self.metrics.set('pool_{}'.format(stat), pool['stats'][stat], (pool['id'],)) @@ -358,7 +348,7 @@ class Module(MgrModule): for fs in fs_map['filesystems']: # collect fs metadata data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']]) - self.metrics.append('fs_metadata', 1, + self.metrics.set('fs_metadata', 1, (data_pools, fs['id'], fs['mdsmap']['metadata_pool'], @@ -367,7 +357,7 @@ class Module(MgrModule): for gid, daemon in fs['mdsmap']['info'].items(): id_ = daemon['name'] host_version = servers.get((id_, 'mds'), ('','')) - self.metrics.append('mds_metadata', 1, + self.metrics.set('mds_metadata', 1, ('mds.{}'.format(id_), fs['id'], host_version[0], daemon['addr'], daemon['rank'], host_version[1])) @@ -379,12 +369,12 @@ class Module(MgrModule): rank = mon['rank'] id_ = mon['name'] host_version = servers.get((id_, 'mon'), ('','')) - self.metrics.append('mon_metadata', 1, + self.metrics.set('mon_metadata', 1, ('mon.{}'.format(id_), host_version[0], mon['public_addr'].split(':')[0], rank, host_version[1])) in_quorum = int(rank in mon_status['quorum']) - self.metrics.append('mon_quorum_status', in_quorum, + self.metrics.set('mon_quorum_status', in_quorum, ('mon.{}'.format(id_),)) def get_pg_status(self): @@ -419,7 +409,7 @@ class Module(MgrModule): id_ = osd['osd'] for stat in OSD_STATS: val = osd['perf_stat'][stat] - self.metrics.append('osd_{}'.format(stat), val, + self.metrics.set('osd_{}'.format(stat), val, ('osd.{}'.format(id_),)) def get_service_list(self): @@ -467,7 +457,7 @@ class Module(MgrModule): host_version = servers.get((str(id_), 'osd'), ('','')) - self.metrics.append('osd_metadata', 1, ( + self.metrics.set('osd_metadata', 1, ( 'osd.{}'.format(id_), c_addr, dev_class, @@ -478,7 +468,7 @@ class Module(MgrModule): # collect osd status for state in OSD_STATUS: status = osd[state] - self.metrics.append('osd_{}'.format(state), status, + self.metrics.set('osd_{}'.format(state), status, ('osd.{}'.format(id_),)) # collect disk occupation metadata @@ -507,7 +497,7 @@ class Module(MgrModule): pool_meta = [] for pool in osd_map['pools']: - self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name'])) + self.metrics.set('pool_metadata', 1, (pool['pool'], pool['pool_name'])) # Populate rgw_metadata for key, value in servers.items(): @@ -515,7 +505,7 @@ class Module(MgrModule): if service_type != 'rgw': continue hostname, version = value - self.metrics.append( + self.metrics.set( 'rgw_metadata', 1, ('{}.{}'.format(service_type, service_id), hostname, version) @@ -528,6 +518,9 @@ class Module(MgrModule): self.metrics.set(stat, pg_sum[stat]) def collect(self): + # Clear the metrics before scraping + self.metrics.clear() + self.get_health() self.get_df() self.get_fs() @@ -557,7 +550,7 @@ class Module(MgrModule): counter_info['description'] + ' Total', ("ceph_daemon",), )) - self.metrics.append(_path, value, (daemon,)) + self.metrics.set(_path, value, (daemon,)) _path = path + '_count' self.metrics.add_metric(_path, Metric( @@ -566,7 +559,7 @@ class Module(MgrModule): counter_info['description'] + ' Count', ("ceph_daemon",), )) - self.metrics.append(_path, counter_info['count'], (daemon,)) + self.metrics.set(_path, counter_info['count'], (daemon,)) else: self.metrics.add_metric(path, Metric( stattype, @@ -574,12 +567,10 @@ class Module(MgrModule): counter_info['description'], ("ceph_daemon",), )) - self.metrics.append(path, value, (daemon,)) + self.metrics.set(path, value, (daemon,)) - # It is sufficient to reset the pending metrics once per scrape - self.metrics.reset() - return self.metrics.metrics + return self.metrics.all() def get_file_sd_config(self): servers = self.list_servers() @@ -656,11 +647,25 @@ class Module(MgrModule): @cherrypy.expose def metrics(self): - if global_instance().have_mon_connection(): - metrics = global_instance().collect() + inst = global_instance() + # Lock the function execution + try: + inst.collect_lock.acquire() + return self._metrics(inst) + finally: + inst.collect_lock.release() + + def _metrics(self, inst): + # Return cached data if available and collected before the cache times out + if inst.collect_cache and time.time() - inst.collect_time < inst.collect_timeout: + return inst.collect_cache + + if inst.have_mon_connection(): + metrics = inst.collect() cherrypy.response.headers['Content-Type'] = 'text/plain' if metrics: - return self.format_metrics(metrics) + inst.collect_cache = self.format_metrics(metrics) + return inst.collect_cache else: raise cherrypy.HTTPError(503, 'No MON connection') -- 2.39.5