import os
import socket
import threading
-from collections import OrderedDict
+import time
from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
# Defaults for the Prometheus HTTP server. Can also set in config-key
class Metrics(object):
def __init__(self):
self.metrics = self._setup_static_metrics()
- self.pending = {}
- def set(self, key, value, labels=('',)):
+ def clear(self):
'''
- Set the value of a single Metrics. This should be used for static metrics,
- e.g. cluster health.
+ Clear all the metrics data. This does not remove the initiated Metric classes.
'''
- self.metrics[key].set(value, labels)
+ for k in self.metrics.keys():
+ self.metrics[k].clear()
- def append(self, key, value, labels = ('',)):
+ def set(self, key, value, labels=('',)):
'''
- Append a metrics to the staging area. Use this to aggregate daemon specific
- metrics that can appear and go away as daemons are added or removed.
+ Set the value of a single Metric (with labels). Use this to set the value of any metric.
'''
- if key not in self.pending:
- self.pending[key] = []
- self.pending[key].append((labels, value))
+ self.metrics[key].set(value, labels)
- def reset(self):
+ def all(self):
'''
- When metrics aggregation is done, call Metrics.reset() to apply the
- aggregated metric. This will remove all label -> value mappings for a
- metric and set the new mapping (from pending). This means daemon specific
- metrics os daemons that do no longer exist, are removed.
+ Return the dict of all the metrics.
'''
- for k, v in self.pending.items():
- self.metrics[k].reset(v)
- self.pending = {}
+ return self.metrics
def add_metric(self, path, metric):
if path not in self.metrics:
self.metrics[path] = metric
-
def _setup_static_metrics(self):
metrics = {}
metrics['health_status'] = Metric(
return metrics
-
class Metric(object):
def __init__(self, mtype, name, desc, labels=None):
self.mtype = mtype
self.labelnames = labels # tuple if present
self.value = {} # indexed by label values
+ def clear(self):
+ self.value = {}
+
def set(self, value, labelvalues=None):
# labelvalues must be a tuple
labelvalues = labelvalues or ('',)
self.value[labelvalues] = value
- def reset(self, values):
- self.value = {}
- for labelvalues, value in values:
- self.value[labelvalues] = value
-
def str_expfmt(self):
def promethize(path):
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.metrics = Metrics()
- self.schema = OrderedDict()
self.shutdown_event = threading.Event()
+ self.collect_lock = threading.RLock()
+ self.collect_time = 0
+ self.collect_timeout = 5.0
+ self.collect_cache = None
_global_instance['plugin'] = self
def get_health(self):
for pool in df['pools']:
for stat in DF_POOL:
- self.metrics.append('pool_{}'.format(stat),
+ self.metrics.set('pool_{}'.format(stat),
pool['stats'][stat],
(pool['id'],))
for fs in fs_map['filesystems']:
# collect fs metadata
data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
- self.metrics.append('fs_metadata', 1,
+ self.metrics.set('fs_metadata', 1,
(data_pools,
fs['id'],
fs['mdsmap']['metadata_pool'],
for gid, daemon in fs['mdsmap']['info'].items():
id_ = daemon['name']
host_version = servers.get((id_, 'mds'), ('',''))
- self.metrics.append('mds_metadata', 1,
+ self.metrics.set('mds_metadata', 1,
('mds.{}'.format(id_), fs['id'],
host_version[0], daemon['addr'],
daemon['rank'], host_version[1]))
rank = mon['rank']
id_ = mon['name']
host_version = servers.get((id_, 'mon'), ('',''))
- self.metrics.append('mon_metadata', 1,
+ self.metrics.set('mon_metadata', 1,
('mon.{}'.format(id_), host_version[0],
mon['public_addr'].split(':')[0], rank,
host_version[1]))
in_quorum = int(rank in mon_status['quorum'])
- self.metrics.append('mon_quorum_status', in_quorum,
+ self.metrics.set('mon_quorum_status', in_quorum,
('mon.{}'.format(id_),))
def get_pg_status(self):
id_ = osd['osd']
for stat in OSD_STATS:
val = osd['perf_stat'][stat]
- self.metrics.append('osd_{}'.format(stat), val,
+ self.metrics.set('osd_{}'.format(stat), val,
('osd.{}'.format(id_),))
def get_service_list(self):
host_version = servers.get((str(id_), 'osd'), ('',''))
- self.metrics.append('osd_metadata', 1, (
+ self.metrics.set('osd_metadata', 1, (
'osd.{}'.format(id_),
c_addr,
dev_class,
# collect osd status
for state in OSD_STATUS:
status = osd[state]
- self.metrics.append('osd_{}'.format(state), status,
+ self.metrics.set('osd_{}'.format(state), status,
('osd.{}'.format(id_),))
# collect disk occupation metadata
pool_meta = []
for pool in osd_map['pools']:
- self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
+ self.metrics.set('pool_metadata', 1, (pool['pool'], pool['pool_name']))
# Populate rgw_metadata
for key, value in servers.items():
if service_type != 'rgw':
continue
hostname, version = value
- self.metrics.append(
+ self.metrics.set(
'rgw_metadata',
1,
('{}.{}'.format(service_type, service_id), hostname, version)
self.metrics.set(stat, pg_sum[stat])
def collect(self):
+ # Clear the metrics before scraping
+ self.metrics.clear()
+
self.get_health()
self.get_df()
self.get_fs()
counter_info['description'] + ' Total',
("ceph_daemon",),
))
- self.metrics.append(_path, value, (daemon,))
+ self.metrics.set(_path, value, (daemon,))
_path = path + '_count'
self.metrics.add_metric(_path, Metric(
counter_info['description'] + ' Count',
("ceph_daemon",),
))
- self.metrics.append(_path, counter_info['count'], (daemon,))
+ self.metrics.set(_path, counter_info['count'], (daemon,))
else:
self.metrics.add_metric(path, Metric(
stattype,
counter_info['description'],
("ceph_daemon",),
))
- self.metrics.append(path, value, (daemon,))
+ self.metrics.set(path, value, (daemon,))
- # It is sufficient to reset the pending metrics once per scrape
- self.metrics.reset()
- return self.metrics.metrics
+ return self.metrics.all()
def get_file_sd_config(self):
servers = self.list_servers()
@cherrypy.expose
def metrics(self):
- if global_instance().have_mon_connection():
- metrics = global_instance().collect()
+ inst = global_instance()
+ # Lock the function execution
+ try:
+ inst.collect_lock.acquire()
+ return self._metrics(inst)
+ finally:
+ inst.collect_lock.release()
+
+ def _metrics(self, inst):
+ # Return cached data if available and collected before the cache times out
+ if inst.collect_cache and time.time() - inst.collect_time < inst.collect_timeout:
+ return inst.collect_cache
+
+ if inst.have_mon_connection():
+ metrics = inst.collect()
cherrypy.response.headers['Content-Type'] = 'text/plain'
if metrics:
- return self.format_metrics(metrics)
+ inst.collect_cache = self.format_metrics(metrics)
+ return inst.collect_cache
else:
raise cherrypy.HTTPError(503, 'No MON connection')