DISK_OCCUPATION = ('instance', 'device', 'ceph_daemon')
-class Metric(object):
- def __init__(self, mtype, name, desc, labels=None):
- self.mtype = mtype
- self.name = name
- self.desc = desc
- self.labelnames = labels # tuple if present
- self.value = dict() # indexed by label values
-
- def set(self, value, labelvalues=None):
- # labelvalues must be a tuple
- labelvalues = labelvalues or ('',)
- self.value[labelvalues] = value
-
- def str_expfmt(self):
-
- def promethize(path):
- ''' replace illegal metric name characters '''
- result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
-
- # Hyphens usually turn into underscores, unless they are
- # trailing
- if result.endswith("-"):
- result = result[0:-1] + "_minus"
- else:
- result = result.replace("-", "_")
-
- return "ceph_{0}".format(result)
-
- def floatstr(value):
- ''' represent as Go-compatible float '''
- if value == float('inf'):
- return '+Inf'
- if value == float('-inf'):
- return '-Inf'
- if math.isnan(value):
- return 'NaN'
- return repr(float(value))
-
- name = promethize(self.name)
- expfmt = '''
-# HELP {name} {desc}
-# TYPE {name} {mtype}'''.format(
- name=name,
- desc=self.desc,
- mtype=self.mtype,
- )
-
- for labelvalues, value in self.value.items():
- if self.labelnames:
- labels = zip(self.labelnames, labelvalues)
- labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
- else:
- labels = ''
- if labels:
- fmtstr = '\n{name}{{{labels}}} {value}'
- else:
- fmtstr = '\n{name} {value}'
- expfmt += fmtstr.format(
- name=name,
- labels=labels,
- value=floatstr(value),
- )
- return expfmt
-
-
-class Module(MgrModule):
- COMMANDS = [
- {
- "cmd": "prometheus self-test",
- "desc": "Run a self test on the prometheus module",
- "perm": "rw"
- },
- ]
-
- def __init__(self, *args, **kwargs):
- super(Module, self).__init__(*args, **kwargs)
+class Metrics(object):
+ def __init__(self):
self.metrics = self._setup_static_metrics()
- self.schema = OrderedDict()
- _global_instance['plugin'] = self
+ self.pending = {}
+
+ def set(self, key, value, labels=('',)):
+ '''
+ Set the value of a single Metrics. This should be used for static metrics,
+ e.g. cluster health.
+ '''
+ self.metrics[key].set(value, labels)
+
+ def append(self, key, value, labels = ('',)):
+ '''
+ Append a metrics to the staging area. Use this to aggregate daemon specific
+ metrics that can appear and go away as daemons are added or removed.
+ '''
+ if key not in self.pending:
+ self.pending[key] = []
+ self.pending[key].append((labels, value))
+
+ def reset(self):
+ '''
+ When metrics aggregation is done, call Metrics.reset() to apply the
+ aggregated metric. This will remove all label -> value mappings for a
+ metric and set the new mapping (from pending). This means daemon specific
+ metrics os daemons that do no longer exist, are removed.
+ '''
+ for k, v in self.pending.items():
+ self.metrics[k].reset(v)
+ self.pending = {}
+
+ def add_metric(self, path, metric):
+ if path not in self.metrics:
+ self.metrics[path] = metric
+
def _setup_static_metrics(self):
metrics = {}
)
for state in OSD_STATUS:
path = 'osd_{}'.format(state)
- self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'untyped',
path,
)
for stat in OSD_STATS:
path = 'osd_{}'.format(stat)
- self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
)
for state in PG_STATES:
path = 'pg_{}'.format(state)
- self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
)
for state in DF_CLUSTER:
path = 'cluster_{}'.format(state)
- self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
)
for state in DF_POOL:
path = 'pool_{}'.format(state)
- self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
return metrics
+
+
+class Metric(object):
+ def __init__(self, mtype, name, desc, labels=None):
+ self.mtype = mtype
+ self.name = name
+ self.desc = desc
+ self.labelnames = labels # tuple if present
+ self.value = {} # indexed by label values
+
+ def set(self, value, labelvalues=None):
+ # labelvalues must be a tuple
+ labelvalues = labelvalues or ('',)
+ self.value[labelvalues] = value
+
+ def reset(self, values):
+ self.value = {}
+ for labelvalues, value in values:
+ self.value[labelvalues] = value
+
+ def str_expfmt(self):
+
+ def promethize(path):
+ ''' replace illegal metric name characters '''
+ result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
+
+ # Hyphens usually turn into underscores, unless they are
+ # trailing
+ if result.endswith("-"):
+ result = result[0:-1] + "_minus"
+ else:
+ result = result.replace("-", "_")
+
+ return "ceph_{0}".format(result)
+
+ def floatstr(value):
+ ''' represent as Go-compatible float '''
+ if value == float('inf'):
+ return '+Inf'
+ if value == float('-inf'):
+ return '-Inf'
+ if math.isnan(value):
+ return 'NaN'
+ return repr(float(value))
+
+ name = promethize(self.name)
+ expfmt = '''
+# HELP {name} {desc}
+# TYPE {name} {mtype}'''.format(
+ name=name,
+ desc=self.desc,
+ mtype=self.mtype,
+ )
+
+ for labelvalues, value in self.value.items():
+ if self.labelnames:
+ labels = zip(self.labelnames, labelvalues)
+ labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
+ else:
+ labels = ''
+ if labels:
+ fmtstr = '\n{name}{{{labels}}} {value}'
+ else:
+ fmtstr = '\n{name} {value}'
+ expfmt += fmtstr.format(
+ name=name,
+ labels=labels,
+ value=floatstr(value),
+ )
+ return expfmt
+
+
+class Module(MgrModule):
+ COMMANDS = [
+ {
+ "cmd": "prometheus self-test",
+ "desc": "Run a self test on the prometheus module",
+ "perm": "rw"
+ },
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.metrics = Metrics()
+ self.schema = OrderedDict()
+ _global_instance['plugin'] = self
+
def get_health(self):
health = json.loads(self.get('health')['json'])
- self.metrics['health_status'].set(
- health_status_to_number(health['status'])
+ self.metrics.set('health_status',
+ health_status_to_number(health['status'])
)
def get_df(self):
# maybe get the to-be-exported metrics from a config?
df = self.get('df')
for stat in DF_CLUSTER:
- path = 'cluster_{}'.format(stat)
- self.metrics[path].set(df['stats'][stat])
+ self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
for pool in df['pools']:
for stat in DF_POOL:
- path = 'pool_{}'.format(stat)
- self.metrics[path].set(pool['stats'][stat], (pool['id'],))
+ self.metrics.append('pool_{}'.format(stat),
+ pool['stats'][stat],
+ (pool['id'],))
def get_quorum_status(self):
mon_status = json.loads(self.get('mon_status')['json'])
- self.metrics['mon_quorum_count'].set(len(mon_status['quorum']))
+ self.metrics.set('mon_quorum_count', len(mon_status['quorum']))
+ # maybe rather us mon_in_quorum{id=$rank} [0,1]?
def get_pg_status(self):
# TODO add per pool status?
pg_status = self.get('pg_status')
# Set total count of PGs, first
- self.metrics['pg_total'].set(
- pg_status['num_pgs'],
- )
+ self.metrics.set('pg_total', pg_status['num_pgs'])
reported_states = {}
for pg in pg_status['pgs_by_state']:
for state in reported_states:
path = 'pg_{}'.format(state)
try:
- self.metrics[path].set(reported_states[state])
+ self.metrics.set(path, reported_states[state])
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
for state in PG_STATES:
- path = 'pg_{}'.format(state)
if state not in reported_states:
try:
- self.metrics[path].set(0)
+ self.metrics.set('pg_{}'.format(state), 0)
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
for osd in osd_stats['osd_stats']:
id_ = osd['osd']
for stat in OSD_STATS:
- status = osd['perf_stat'][stat]
- self.metrics['osd_{}'.format(stat)].set(
- status,
- ('osd.{}'.format(id_),))
+ val = osd['perf_stat'][stat]
+ self.metrics.append('osd_{}'.format(stat), val,
+ ('osd.{}'.format(id_),))
def get_metadata_and_osd_status(self):
osd_map = self.get('osd_map')
)
osd_devices = self.get('osd_map_crush')['devices']
for osd in osd_map['osds']:
+ # id can be used to link osd metrics and metadata
id_ = osd['osd']
+ # collect osd metadata
p_addr = osd['public_addr'].split(':')[0]
c_addr = osd['cluster_addr'].split(':')[0]
if p_addr == "-" or c_addr == "-":
)
continue
dev_class = next((osd for osd in osd_devices if osd['id'] == id_))
- self.metrics['osd_metadata'].set(1, (
- c_addr,
- dev_class['class'],
- id_,
- p_addr
- ))
+ self.metrics.append('osd_metadata', 1, (c_addr, dev_class.get('class',''), id_, p_addr))
+
+ # collect osd status
for state in OSD_STATUS:
status = osd[state]
- self.metrics['osd_{}'.format(state)].set(
- status,
- ('osd.{}'.format(id_),))
+ self.metrics.append('osd_{}'.format(state), status,
+ ('osd.{}'.format(id_),))
+ # collect disk occupation metadata
osd_metadata = self.get_metadata("osd", str(id_))
if osd_metadata is None:
continue
if osd_dev_node and osd_hostname:
self.log.debug("Got dev for osd {0}: {1}/{2}".format(
id_, osd_hostname, osd_dev_node))
- self.metrics['disk_occupation'].set(1, (
+ self.metrics.set('disk_occupation', 1, (
osd_hostname,
osd_dev_node,
"osd.{0}".format(id_)
self.log.info("Missing dev node metadata for osd {0}, skipping "
"occupation record for this osd".format(id_))
+ pool_meta = []
for pool in osd_map['pools']:
- id_ = pool['pool']
- name = pool['pool_name']
- self.metrics['pool_metadata'].set(1, (id_, name))
+ self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
def collect(self):
self.get_health()
self.get_metadata_and_osd_status()
self.get_pg_status()
- for daemon, counters in self.get_all_perf_counters().iteritems():
+ for daemon, counters in self.get_all_perf_counters().items():
for path, counter_info in counters.items():
stattype = self._stattype_to_str(counter_info['type'])
# XXX simplify first effort: no histograms
self.log.debug('ignoring %s, type %s' % (path, stattype))
continue
- if path not in self.metrics:
- self.metrics[path] = Metric(
+ self.metrics.add_metric(path, Metric(
stattype,
path,
counter_info['description'],
("ceph_daemon",),
- )
+ ))
- self.metrics[path].set(
- counter_info['value'],
- (daemon,)
- )
+ self.metrics.append(path, counter_info['value'], (daemon,))
+ # It is sufficient to reset the pending metrics once per scrape
+ self.metrics.reset()
- return self.metrics
+ return self.metrics.metrics
def handle_command(self, cmd):
if cmd['prefix'] == 'prometheus self-test':