+++ /dev/null
-../../../../cephmetrics/
\ No newline at end of file
--- /dev/null
+../../../../cephmetrics.py
\ No newline at end of file
--- /dev/null
+../../../../cephmetrics_collectors
\ No newline at end of file
replace:
dest: "{{ collectd_conf_d }}/cephmetrics.conf"
regexp: 'ModulePath ".*"'
- replace: 'ModulePath "{{ collectd_cephmetrics_dir }}"'
+ replace: 'ModulePath "{{ collectd_dir }}"'
notify: Restart collectd
set_fact:
collectd_dir: "/usr/lib{{ '64' if ansible_pkg_mgr == 'yum' else '' }}/collectd"
-- name: Set collectd_cephmetrics_dir
+- name: Set collectors_dir
set_fact:
- collectd_cephmetrics_dir: "{{ collectd_dir }}/cephmetrics"
+ collectors_dir: "{{ collectd_dir }}/cephmetrics_collectors"
- name: Remove stale Python files
file:
dest: "{{ item }}"
state: absent
with_items:
- - "{{ collectd_cephmetrics_dir }}/cephmetrics.py"
- - "{{ collectd_cephmetrics_dir }}/cephmetrics.pyc"
- - "{{ collectd_cephmetrics_dir }}/collectors"
+ - "{{ collectd_dir }}/cephmetrics"
+ - "{{ collectors_dir }}/collectors"
-- name: Ship collector plugins
+- name: Ship collectors
copy:
- src: files/cephmetrics
- dest: "{{ collectd_cephmetrics_dir }}"
+ src: files/cephmetrics_collectors/
+ dest: "{{ collectors_dir }}"
+ notify: Restart collectd
+
+- name: Ship collectd plugin
+ copy:
+ src: files/cephmetrics.py
+ dest: "{{ collectd_dir }}"
notify: Restart collectd
--- /dev/null
+#!/usr/bin/env python
+import logging
+import os
+import sys
+
+TEST_MODE = (sys.argv[0].split('/')[-1] == 'py.test')
+try:
+ import collectd
+except ImportError:
+ if not TEST_MODE:
+ raise
+
+from cephmetrics_collectors import (Ceph, common, iscsi, mon, osd, rgw)
+
+__author__ = 'Paul Cuzner'
+
+PLUGIN_NAME = 'cephmetrics'
+
+
+def write_stats(role_metrics, stats):
+
+ flat_stats = common.flatten_dict(stats, '.')
+ for key_name in flat_stats:
+ attr_name = key_name.split('.')[-1]
+
+ # TODO: this needs some more think time, since the key from the name
+ # is not the key of the all_metrics dict
+ if attr_name in role_metrics:
+ attr_type = role_metrics[attr_name][1] # gauge / derive etc
+ else:
+ # assign a default
+ attr_type = 'gauge'
+
+ attr_value = flat_stats[key_name]
+
+ val = collectd.Values(plugin=PLUGIN_NAME, type=attr_type)
+ instance_name = "{}.{}".format(CEPH.cluster_name,
+ key_name)
+ val.type_instance = instance_name
+ val.values = [attr_value]
+ val.dispatch()
+
+
+def configure_callback(conf):
+
+ valid_log_levels = ['debug', 'info']
+
+ global CEPH
+ module_parms = {node.key: node.values[0] for node in conf.children}
+
+ log_level = module_parms.get('LogLevel', 'debug')
+ if log_level not in valid_log_levels:
+ collectd.error("LogLevel specified is invalid - must"
+ " be :{}".format(' or '.join(valid_log_levels)))
+
+ if 'ClusterName' in module_parms:
+ cluster_name = module_parms['ClusterName']
+ # cluster name is all we need to get started
+ if not os.path.exists('/etc/ceph/{}.conf'.format(cluster_name)):
+ collectd.error("Clustername given ('{}') not found in "
+ "/etc/ceph".format(module_parms['ClusterName']))
+
+ # let's assume the conf file is OK to use
+ CEPH.cluster_name = cluster_name
+
+ setup_module_logging(log_level)
+
+ CEPH.probe()
+ collectd.info(
+ "{}: Roles detected - mon:{} osd:{} rgw:{} iscsi:{}".format(
+ __name__,
+ isinstance(CEPH.mon, mon.Mon),
+ isinstance(CEPH.osd, osd.OSDs),
+ isinstance(CEPH.rgw, rgw.RGW),
+ isinstance(CEPH.iscsi, iscsi.ISCSIGateway),
+ ))
+
+ else:
+ collectd.error("ClusterName is required")
+
+
+def setup_module_logging(log_level, path='/var/log/collectd-cephmetrics.log'):
+
+ level = {"debug": logging.DEBUG,
+ "info": logging.INFO}
+
+ logging.getLogger('cephmetrics')
+ log_conf = dict(
+ format='%(asctime)s - %(levelname)-7s - '
+ '[%(filename)s:%(lineno)s:%(funcName)s() - '
+ '%(message)s',
+ level=level.get(log_level)
+ )
+ if path:
+ log_conf['filename'] = path
+ logging.basicConfig(**log_conf)
+
+
+def read_callback():
+
+ if CEPH.mon:
+ mon_stats = CEPH.mon.get_stats()
+ write_stats(mon.Mon.all_metrics, mon_stats)
+
+ if CEPH.rgw:
+ rgw_stats = CEPH.rgw.get_stats()
+ write_stats(rgw.RGW.all_metrics, rgw_stats)
+
+ if CEPH.osd:
+ osd_node_stats = CEPH.osd.get_stats()
+ write_stats(osd.OSDs.all_metrics, osd_node_stats)
+
+ if CEPH.iscsi:
+ iscsi_stats = CEPH.iscsi.get_stats()
+ write_stats(iscsi.ISCSIGateway.metrics, iscsi_stats)
+
+
+if TEST_MODE:
+ pass
+else:
+ CEPH = Ceph()
+ collectd.register_config(configure_callback)
+ collectd.register_read(read_callback)
+++ /dev/null
-#!/usr/bin/env python
-import sys
-TEST_MODE = (sys.argv[0].split('/')[-1] == 'py.test')
-
-import os
-import glob
-import logging
-
-try:
- import collectd
-except ImportError:
- if not TEST_MODE:
- raise
-
-from collectors import (common, iscsi, mon, osd, rgw)
-
-__author__ = 'Paul Cuzner'
-
-PLUGIN_NAME = 'cephmetrics'
-
-
-class Ceph(object):
- def __init__(self):
- self.cluster_name = None
- self.host_name = common.get_hostname()
-
- self.mon_socket = None
- self.rgw_socket = None
-
- self.mon = None
- self.rgw = None
- self.osd = None
- self.iscsi = None
-
- def probe(self):
- """
- set up which collector(s) to use, based on what types of sockets we
- find in /var/run/ceph
- """
-
- mon_socket = '/var/run/ceph/{}-mon.{}.asok'.format(self.cluster_name,
- self.host_name)
- if os.path.exists(mon_socket):
- self.mon_socket = mon_socket
- self.mon = mon.Mon(self.cluster_name,
- admin_socket=mon_socket)
-
- rgw_socket_list = glob.glob('/var/run/ceph/{}-client.rgw.*.'
- 'asok'.format(self.cluster_name))
-
- if rgw_socket_list:
- rgw_socket = rgw_socket_list[0]
- self.rgw = rgw.RGW(self.cluster_name,
- admin_socket=rgw_socket)
-
- osd_socket_list = glob.glob('/var/run/ceph/{}-osd.*'
- '.asok'.format(self.cluster_name))
- mounted = common.freadlines('/proc/mounts')
- osds_mounted = [mnt for mnt in mounted
- if mnt.split()[1].startswith('/var/lib/ceph')]
- if osd_socket_list or osds_mounted:
- self.osd = osd.OSDs(self.cluster_name)
-
- if os.path.exists('/sys/kernel/config/target/iscsi'):
- self.iscsi = iscsi.ISCSIGateway(self.cluster_name)
-
- collectd.info(
- "{}: Roles detected - mon:{} osd:{} rgw:{} iscsi:{}".format(
- __name__,
- isinstance(self.mon, mon.Mon),
- isinstance(self.osd, osd.OSDs),
- isinstance(self.rgw, rgw.RGW),
- isinstance(self.iscsi, iscsi.ISCSIGateway),
- ))
-
-
-def write_stats(role_metrics, stats):
-
- flat_stats = common.flatten_dict(stats, '.')
- for key_name in flat_stats:
- attr_name = key_name.split('.')[-1]
-
- # TODO: this needs some more think time, since the key from the name
- # is not the key of the all_metrics dict
- if attr_name in role_metrics:
- attr_type = role_metrics[attr_name][1] # gauge / derive etc
- else:
- # assign a default
- attr_type = 'gauge'
-
- attr_value = flat_stats[key_name]
-
- val = collectd.Values(plugin=PLUGIN_NAME, type=attr_type)
- instance_name = "{}.{}".format(CEPH.cluster_name,
- key_name)
- val.type_instance = instance_name
- val.values = [attr_value]
- val.dispatch()
-
-
-def configure_callback(conf):
-
- valid_log_levels = ['debug', 'info']
-
- global CEPH
- module_parms = {node.key: node.values[0] for node in conf.children}
-
- log_level = module_parms.get('LogLevel', 'debug')
- if log_level not in valid_log_levels:
- collectd.error("LogLevel specified is invalid - must"
- " be :{}".format(' or '.join(valid_log_levels)))
-
- if 'ClusterName' in module_parms:
- cluster_name = module_parms['ClusterName']
- # cluster name is all we need to get started
- if not os.path.exists('/etc/ceph/{}.conf'.format(cluster_name)):
- collectd.error("Clustername given ('{}') not found in "
- "/etc/ceph".format(module_parms['ClusterName']))
-
- # let's assume the conf file is OK to use
- CEPH.cluster_name = cluster_name
-
- setup_module_logging(log_level)
-
- CEPH.probe()
-
- else:
- collectd.error("ClusterName is required")
-
-
-def setup_module_logging(log_level, path='/var/log/collectd-cephmetrics.log'):
-
- level = {"debug": logging.DEBUG,
- "info": logging.INFO}
-
- logging.getLogger('cephmetrics')
- log_conf = dict(
- format='%(asctime)s - %(levelname)-7s - '
- '[%(filename)s:%(lineno)s:%(funcName)s() - '
- '%(message)s',
- level=level.get(log_level)
- )
- if path:
- log_conf['filename'] = path
- logging.basicConfig(**log_conf)
-
-
-def read_callback():
-
- if CEPH.mon:
- mon_stats = CEPH.mon.get_stats()
- write_stats(mon.Mon.all_metrics, mon_stats)
-
- if CEPH.rgw:
- rgw_stats = CEPH.rgw.get_stats()
- write_stats(rgw.RGW.all_metrics, rgw_stats)
-
- if CEPH.osd:
- osd_node_stats = CEPH.osd.get_stats()
- write_stats(osd.OSDs.all_metrics, osd_node_stats)
-
- if CEPH.iscsi:
- iscsi_stats = CEPH.iscsi.get_stats()
- write_stats(iscsi.ISCSIGateway.metrics, iscsi_stats)
-
-
-if TEST_MODE:
- pass
-else:
- CEPH = Ceph()
- collectd.register_config(configure_callback)
- collectd.register_read(read_callback)
+++ /dev/null
-#!/usr/bin/env python
-
-import json
-import time
-import logging
-
-from ceph_daemon import admin_socket
-
-
-class BaseCollector(object):
-
- def __init__(self, cluster_name, admin_socket=None):
- self.cluster_name = cluster_name
- self.admin_socket = admin_socket
-
- self.logger = logging.getLogger('cephmetrics')
-
- def _admin_socket(self, cmds=None, socket_path=None):
-
- adm_socket = self.admin_socket if not socket_path else socket_path
-
- if not cmds:
- cmds = ['perf', 'dump']
-
- start = time.time()
- response = admin_socket(adm_socket, cmds,
- format='json')
- end = time.time()
-
- self.logger.debug("admin_socket call '{}' : "
- "{:.3f}s".format(' '.join(cmds),
- (end - start)))
-
- return json.loads(response)
-
- def get_stats(self):
-
- return {}
+++ /dev/null
-#!/usr/bin/env python
-
-
-import socket
-from os import statvfs
-import math
-
-
-def get_hostname():
- return socket.gethostname().split('.')[0]
-
-
-def add_dicts(dict1, dict2):
- """
- Add dictionary values together
- :param dict1:
- :param dict2:
- :return: dict with matching fields sum'd together
- """
- return {key: dict1.get(key, 0) + dict2.get(key, 0)
- for key in set(dict1).union(dict2)}
-
-
-def merge_dicts(dict1, dict2):
- """
- merges two dicts together to form a single dict. when dict keys overlap
- the value in the 2nd dict takes precedence
- :param dict1:
- :param dict2:
- :return: combined dict
- """
-
- new = dict1.copy()
- new.update(dict2)
-
- return new
-
-
-def flatten_dict(data, separator='.', prefix=''):
- """
- flatten a dict, so it is just simple key/value pairs
- :param data: (dict)
- :param separator: (str) char to use when combining keys
- :param prefix: key prefix
- :return:
- """
- return {prefix + separator + k if prefix else k: v
- for kk, vv in data.items()
- for k, v in flatten_dict(vv, separator, kk).items()
- } if isinstance(data, dict) else {prefix: data}
-
-
-def todict(obj):
- """
- convert an object to a dict representation
- :param obj: (object) object to examine, to extract variables/values from
- :return: (dict) representation of the given object
- """
- data = {}
- for key, value in obj.__dict__.iteritems():
-
- if key.startswith('_'):
- continue
-
- try:
- data[key] = todict(value)
- except AttributeError:
- data[key] = value
-
- return data
-
-
-def fread(file_name=None):
- """
- Simple read function for files of a single value
- :param file_name: (str) file name to read
- :return: (str) contents of the file
- """
-
- with open(file_name, 'r') as f:
- setting = f.read().rstrip()
- return setting
-
-
-def freadlines(file_name=None):
- """
- simple readlines function to return all records of a given file
- :param file_name: (str) file name to read
- :return: (list) contents of the file
- """
-
- with open(file_name, 'r') as f:
- data = f.readlines()
- return data
-
-
-class IOstat(object):
- raw_metrics = [
- "_reads",
- "_reads_mrgd",
- "_sectors_read",
- "_read_ms",
- "_writes",
- "_writes_mrgd",
- "_sectors_written",
- "_write_ms",
- "_current_io",
- "_ms_active_io",
- "_ms_active_io_w"
- ]
-
- sector_size = 512
-
- metrics = {
- "iops": ("iops", "gauge"),
- "r_iops": ("r_iops", "gauge"),
- "w_iops": ("w_iops", "gauge"),
- "bytes_per_sec": ("bytes_per_sec", "gauge"),
- "r_bytes_per_sec": ("r_bytes_per_sec", "gauge"),
- "w_bytes_per_sec": ("w_bytes_per_sec", "gauge"),
- "util": ("util", "gauge"),
- "await": ("await", "gauge"),
- "r_await": ("r_await", "gauge"),
- "w_await": ("w_await", "gauge"),
- }
-
- def __init__(self):
- self._previous = []
- self._current = []
-
- # Seed the metrics we're interested in
- for ctr in IOstat.metrics.keys():
- setattr(self, ctr, 0)
-
- def __str__(self):
- s = '\n- IOstat object:\n'
- for key in sorted(vars(self)):
- s += '\t{} ... {}\n'.format(key, getattr(self, key))
- return s
-
- def _calc_raw_delta(self):
- if not self._previous:
- # nothing to compute yet
- for ptr in range(len(IOstat.raw_metrics)):
- key = IOstat.raw_metrics[ptr]
- setattr(self, key, 0)
- else:
- for ptr in range(len(IOstat.raw_metrics)):
- key = IOstat.raw_metrics[ptr]
- setattr(self, key, (int(self._current[ptr]) -
- int(self._previous[ptr])))
-
- def compute(self, sample_interval):
- """
- Calculate the iostats for this device
- """
-
- self._calc_raw_delta()
-
- if sample_interval > 0:
- interval_ms = sample_interval * 1000
- total_io = self._reads + self._writes
- self.util = float(self._ms_active_io) / interval_ms * 100
- self.iops = int(total_io) / sample_interval
- self.r_iops = int(self._reads) / sample_interval
- self.w_iops = int(self._writes) / sample_interval
- self.await = (float(self._write_ms + self._read_ms) / total_io if
- total_io > 0 else 0)
- self.w_await = float(
- self._write_ms) / self._writes if self._writes > 0 else 0
- self.r_await = float(
- self._read_ms) / self._reads if self._reads > 0 else 0
- self.r_bytes_per_sec = (float(
- self._sectors_read * IOstat.sector_size)) / sample_interval
- self.w_bytes_per_sec = (float(
- self._sectors_written * IOstat.sector_size)) / sample_interval
- self.bytes_per_sec = self.r_bytes_per_sec + self.w_bytes_per_sec
-
-
-class Disk(object):
-
- metrics = {
- "rotational": ("rotational", "gauge"),
- "disk_size": ("disk_size", "gauge"),
- "fs_size": ("fs_size", "gauge"),
- "fs_used": ("fs_used", "gauge"),
- "fs_percent_used": ("fs_percent_used", "gauge"),
- "osd_id": ("osd_id", "gauge")
- }
-
- osd_types = {"filestore": 0, "bluestore": 1}
-
- def __init__(self, device_name, path_name=None, osd_id=None,
- in_osd_type="filestore", encrypted=0):
-
- self._name = device_name
- self._path_name = path_name
- self._base_dev = Disk.get_base_dev(device_name)
- self.osd_id = osd_id
-
- self.rotational = self._get_rota()
- self.disk_size = self._get_size()
- self.perf = IOstat()
- self.fs_size = 0
- self.fs_percent_used = 0
- self.fs_used = 0
- self.encrypted = encrypted
- self.osd_type = Disk.osd_types[in_osd_type]
-
- self.refresh()
-
- def _get_size(self):
- return int(fread("/sys/block/{}/size".format(self._base_dev))) * 512
-
- def _get_rota(self):
- return int(
- fread("/sys/block/{}/queue/rotational".format(self._base_dev)))
-
- def _get_fssize(self):
- s = statvfs("{}/whoami".format(self._path_name))
- fs_size = s.f_blocks * s.f_bsize
- fs_used = fs_size - (s.f_bfree * s.f_bsize)
- fs_percent_used = math.ceil((float(fs_used) / fs_size) * 100)
- return fs_size, fs_used, fs_percent_used
-
- def refresh(self):
- # only run the fs size update, if the _path_name is set.
- if self._path_name:
- self.fs_size, self.fs_used, self.fs_percent_used = \
- self._get_fssize()
-
- @staticmethod
- def get_base_dev(dev_name):
-
- # for intelcas devices, just use the device name as is
- if dev_name.startswith('intelcas'):
- device = dev_name
- elif dev_name.startswith('nvme'):
- if 'p' in dev_name:
- device = dev_name[:(dev_name.index('p'))]
- else:
- device = dev_name
- else:
- # default strip any numeric ie. sdaa1 -> sdaa
- device = filter(lambda ch: ch.isalpha(), dev_name)
-
- return device
+++ /dev/null
-#!/usr/bin/env python2
-
-# requires python-rtslib_fb for LIO interaction
-#
-# NB. the rtslib_fb module is dynamically loaded by the ISCSIGateway
-# class instantiation. This prevents import errors within the generic parent
-# module cephmetrics
-#
-import os
-import sys
-import time
-
-from cephmetrics.collectors import (base, common)
-
-
-class Client(object):
-
- def __init__(self, iqn):
- self.iqn = iqn
- self.name = iqn.replace('.', '-')
- self.luns = {}
- self.lun_count = 0
- self._cycle = 0
-
- def dump(self):
- client_dump = {}
- lun_info = {}
- client_dump[self.name] = {"luns": {},
- "lun_count": self.lun_count}
- for lun_name in self.luns:
- lun = self.luns[lun_name]
- lun_info.update(lun.dump())
-
- return {self.name: {"luns": lun_info,
- "lun_count": len(lun_info)}
- }
-
-
-class LUN(object):
-
- def __init__(self, client, tpg_lun):
- self._path = tpg_lun.storage_object.path
- self._tpg_lun = tpg_lun
- self._name = tpg_lun.storage_object.name
- self._display_name = tpg_lun.storage_object.name.replace('.', "-")
- self._so = tpg_lun.storage_object
- self._client = client
- self._cycle = 0
- self.size = 0
- self.iops = 0
- self.read_bytes_per_sec = 0
- self.write_bytes_per_sec = 0
- self.total_bytes_per_sec = 0
- self.active_path = 0
-
- def refresh(self, cycle_id):
- self._cycle = cycle_id
- self.size = self._so.size
- stats_path = os.path.join(self._path, 'statistics/scsi_lu')
- self.iops = int(common.fread(os.path.join(stats_path, "num_cmds")))
- read_mb = float(common.fread(os.path.join(stats_path, "read_mbytes")))
- write_mb = float(
- common.fread(os.path.join(stats_path, "write_mbytes")))
- self.read_bytes_per_sec = int(read_mb * 1024 ** 2)
- self.write_bytes_per_sec = int(write_mb * 1024 ** 2)
- self.total_bytes_per_sec = self.read_bytes_per_sec + \
- self.write_bytes_per_sec
-
- if self._tpg_lun.alua_tg_pt_gp_name == 'ao':
- self.active_path = 1
- else:
- self.active_path = 0
-
- def dump(self):
- return {self._display_name: {k: getattr(self, k) for k in self.__dict__
- if not k.startswith("_")}}
-
-
-class ISCSIGateway(base.BaseCollector):
- """
- created on a host that has a /sys/kernel/config/target/iscsi dir
- i.e. there is an iscsi gateway here!
- """
-
- metrics = {
- "lun_count": ("lun_count", "gauge"),
- "client_count": ("client_count", "gauge"),
- "tpg_count": ("tpg_count", "gauge"),
- "sessions": ("sessions", "gauge"),
- "capacity": ("capacity", "gauge"),
- "iops": ("iops", "derive"),
- "read_bytes_per_sec": ("read_bytes_per_sec", "derive"),
- "write_bytes_per_sec": ("write_bytes_per_sec", "derive"),
- "total_bytes_per_sec": ("total_bytes_per_sec", "derive")
- }
-
- def __init__(self, *args, **kwargs):
- base.BaseCollector.__init__(self, *args, **kwargs)
-
- # Since the module can be imported by a parent class but not
- # instantiated, the rtslib import is deferred until the first instance
- # of the the class is created. This keeps the parent module simple
- # and more importantly generic
- if 'rtslib_fb.root' not in sys.modules.keys():
-
- try:
- import rtslib_fb.root as RTSRoot
- except ImportError:
- raise ImportError("rtslib_fb python package is missing")
-
- self._root = RTSRoot()
-
- self.clients = {}
- self.cycle = 0
-
- self.iops = 0
- self.read_bytes_per_sec = 0
- self.write_bytes_per_sec = 0
- self.total_bytes_per_sec = 0
-
- def refresh(self):
- """
- populate the instance by exploring rtslib
- """
-
- self.iops = 0
- self.read_bytes_per_sec = 0
- self.write_bytes_per_sec = 0
- self.total_bytes_per_sec = 0
-
- if self.cycle == 10:
- self.cycle = 0
- else:
- self.cycle += 1
-
- for node_acl in self._root.node_acls:
-
- client_name = node_acl.node_wwn
-
- if client_name not in self.clients:
- new_client = Client(client_name)
- self.clients[client_name] = new_client
-
- client = self.clients[client_name]
- client.lun_count = 0
- client._cycle = self.cycle
-
- for lun in node_acl.mapped_luns:
- client.lun_count += 1
- tpg_lun = lun.tpg_lun
- lun_name = tpg_lun.storage_object.name
- if lun_name not in client.luns:
- lun = LUN(client, tpg_lun)
- client.luns[lun._name] = lun
- else:
- lun = client.luns[lun_name]
-
- lun.refresh(self.cycle)
-
- self.iops += lun.iops
- self.read_bytes_per_sec += lun.read_bytes_per_sec
- self.write_bytes_per_sec += lun.write_bytes_per_sec
- self.total_bytes_per_sec = self.read_bytes_per_sec + \
- self.write_bytes_per_sec
-
- def prune(self):
- """
- drop child objects held by the instance, that are no longer in the
- iSCSI config i.e. don't report on old information
- """
-
- for client_name in self.clients:
- client = self.clients[client_name]
-
- for lun_name in client.luns:
- lun = client.luns[lun_name]
- if lun._cycle != self.cycle:
- # drop the lun entry
- self.logger.debug("pruning LUN '{}'".format(lun_name))
-
- del client.luns[lun_name]
-
- if client._cycle != self.cycle:
- # drop the client entry
- self.logger.debug("pruning client '{}'".format(client_name))
- del self.clients[client_name]
-
- def dump(self):
-
- gw_stats = {}
- client_stats = {}
-
- for metric in ISCSIGateway.metrics:
- gw_stats[metric] = getattr(self, metric)
-
- for client_name in self.clients:
- client = self.clients[client_name]
- client_stats.update(client.dump())
-
- return {"iscsi": {
- "gw_name": {self.gateway_name: 0},
- "gw_stats": gw_stats,
- "gw_clients": client_stats
- }
- }
-
- def _get_so(self):
- return [so for so in self._root.storage_objects]
-
- def _get_node_acls(self):
- return [node for node in self._root.node_acls]
-
- @property
- def tpg_count(self):
- return len([tpg for tpg in self._root.tpgs])
-
- @property
- def lun_count(self):
- return len(self._get_so())
-
- @property
- def sessions(self):
- return len([session for session in self._root.sessions])
-
- @property
- def gateway_name(self):
- # Only the 1st gateway is considered/supported
- gw_iqn = [gw.wwn for gw in self._root.targets][0]
- return gw_iqn.replace('.', '-')
-
- @property
- def client_count(self):
- return len(self._get_node_acls())
-
- @property
- def capacity(self):
- return sum([so.size for so in self._get_so()])
-
- def get_stats(self):
-
- start = time.time()
-
- # populate gateway instance with the latest configuration from rtslib
- self.refresh()
-
- # Overtime they'll be churn in client and disks so we need to drop
- # any entries from prior runs that are no longer seen in the iscsi
- # configuration with the prune method
- self.prune()
-
- end = time.time()
-
- self.logger.info("LIO stats took {}s".format(end - start))
-
- return self.dump()
+++ /dev/null
-#!/usr/bin/env python
-
-import rados
-import rbd
-import json
-import threading
-import time
-import logging
-
-from cephmetrics.collectors import (base, common)
-
-
-class RBDScanner(threading.Thread):
-
- def __init__(self, cluster_name, pool_name):
- self.cluster_name = cluster_name
- self.pool_name = pool_name
- self.num_rbds = 0
- self.logger = logging.getLogger('cephmetrics')
-
- threading.Thread.__init__(self)
-
- def run(self):
- rbd_images = []
- conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
- self.logger.debug("scan of '{}' starting".format(self.pool_name))
- with rados.Rados(conffile=conf_file) as cluster:
- with cluster.open_ioctx(self.pool_name) as ioctx:
- rbd_inst = rbd.RBD()
- self.logger.debug("listing rbd's in {}".format(self.pool_name))
- rbd_images = rbd_inst.list(ioctx)
-
- self.logger.info("pool scan complete for '{}'".format(self.pool_name))
- self.num_rbds = len(rbd_images)
-
-
-class Mon(base.BaseCollector):
-
- health = {
- "HEALTH_OK": 0,
- "HEALTH_WARN": 4,
- "HEALTH_ERR": 8
- }
-
- osd_state = {
- "up": 0,
- "down": 1
- }
-
- # metrics are declared, where each element has a description and collectd
- # data type. The description is used to ensure the names sent by collectd
- # remain the same even if the source name changes in ceph.
- cluster_metrics = {
- "num_mon": ("num_mon", "gauge"),
- "num_mon_quorum": ("num_mon_quorum", "gauge"),
- "num_rbds": ("num_rbds", "gauge"),
- "num_osd_hosts": ("num_osd_hosts", "gauge"),
- "num_osd": ("num_osd", "gauge"),
- "num_osd_up": ("num_osd_up", "gauge"),
- "num_osd_in": ("num_osd_in", "gauge"),
- "osd_epoch": ("osd_epoch", "gauge"),
- "osd_bytes": ("osd_bytes", "gauge"),
- "osd_bytes_used": ("osd_bytes_used", "gauge"),
- "osd_bytes_avail": ("osd_bytes_avail", "gauge"),
- "num_pool": ("num_pool", "gauge"),
- "num_pg": ("num_pg", "gauge"),
- "num_pg_active_clean": ("num_pg_active_clean", "gauge"),
- "num_pg_active": ("num_pg_active", "gauge"),
- "num_pg_peering": ("num_pg_peering", "gauge"),
- "num_object": ("num_object", "gauge"),
- "num_object_degraded": ("num_object_degraded", "gauge"),
- "num_object_misplaced": ("num_object_misplaced", "gauge"),
- "num_object_unfound": ("num_object_unfound", "gauge"),
- "num_bytes": ("num_bytes", "gauge"),
- "num_mds_up": ("num_mds_up", "gauge"),
- "num_mds_in": ("num_mds_in", "gauge"),
- "num_mds_failed": ("num_mds_failed", "gauge"),
- "mds_epoch": ("mds_epoch", "gauge"),
- "health": ("health", "gauge")
- }
-
- pool_client_metrics = {
- 'bytes_sec': ("bytes_sec", "gauge"),
- 'op_per_sec': ("op_per_sec", "gauge"),
- 'read_bytes_sec': ("read_bytes_sec", "gauge"),
- 'write_op_per_sec': ("write_op_per_sec", "gauge"),
- 'write_bytes_sec': ("write_bytes_sec", "gauge"),
- 'read_op_per_sec': ("read_op_per_sec", "gauge")
- }
-
- pool_recovery_metrics = {
- "recovering_objects_per_sec": ("recovering_objects_per_sec", "gauge"),
- "recovering_bytes_per_sec": ("recovering_bytes_per_sec", "gauge"),
- "recovering_keys_per_sec": ("recovering_keys_per_sec", "gauge"),
- "num_objects_recovered": ("num_objects_recovered", "gauge"),
- "num_bytes_recovered": ("num_bytes_recovered", "gauge"),
- "num_keys_recovered": ("num_keys_recovered", "gauge")
- }
-
- osd_metrics = {
- "status": ("status", "gauge")
- }
-
- mon_states = {
- "mon_status": ("mon_status", "gauge")
- }
-
- all_metrics = common.merge_dicts(
- pool_recovery_metrics, pool_client_metrics)
- all_metrics = common.merge_dicts(all_metrics, cluster_metrics)
- all_metrics = common.merge_dicts(all_metrics, osd_metrics)
- all_metrics = common.merge_dicts(all_metrics, mon_states)
-
- def __init__(self, *args, **kwargs):
- base.BaseCollector.__init__(self, *args, **kwargs)
- self.version = self._get_version()
- if self.version < 12:
- self.get_mon_health = self._mon_health
- else:
- self.get_mon_health = self._mon_health_new
-
- def _get_version(self):
- vers_info = self._mon_command('version')
- return int(vers_info['version'].replace('.', ' ').split()[2])
-
- def _mon_command(self, cmd_request):
- """ Issue a command to the monitor """
-
- buf_s = '{}'
- conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
-
- start = time.time()
- with rados.Rados(conffile=conf_file) as cluster:
- cmd = {'prefix': cmd_request, 'format': 'json'}
- rc, buf_s, out = cluster.mon_command(json.dumps(cmd), b'')
- end = time.time()
-
- self.logger.debug("_mon_command call '{}' :"
- " {:.3f}s".format(cmd_request,
- (end - start)))
-
- return json.loads(buf_s)
-
- @staticmethod
- def get_feature_state(summary_data, pg_states):
- """
- Look at the summary list to determine the state of RADOS features
- :param summary_data: (list) summary data from a ceph health command
- :return: (dict) dict indexed by feature
- 0 Inactive, 1 Active, 2 Disabled
- """
- feature_lookup = {"noscrub": "scrub",
- "nodeep-scrub": "deep_scrub",
- "norecover": "recovery",
- "nobackfill": "backfill",
- "norebalance": "rebalance",
- "noout": "out",
- "nodown": "down"}
-
- # Start with all features inactive i.e. enabled
- feature_state = {feature_lookup.get(key): 0 for key in feature_lookup}
-
- for summary in summary_data:
- summary_desc = summary.get('summary')
- if "flag(s) set" in summary_desc:
- flags = summary_desc.replace(' flag(s) set', '').split(',')
- for disabled_feature in flags:
- if disabled_feature in feature_lookup:
- feature = feature_lookup.get(disabled_feature)
- feature_state[feature] = 2 # feature disabled
-
- # Now use the current pg state names to determine whether a feature is
- # active - if not it stays set to '0', which means inactive
- pg_state_names = [pg_state.get('name') for pg_state in pg_states]
- for pg_state in pg_state_names:
- states = pg_state.split('+')
- if 'recovering' in states:
- feature_state['recovery'] = 1 # Active
- continue
- if 'backfilling' in states:
- feature_state['backfill'] = 1
- continue
- if 'deep' in states:
- feature_state['deep_scrub'] = 1
- continue
- if 'scrubbing' in states:
- feature_state['scrub'] = 1
-
- return feature_state
-
- @classmethod
- def check_stuck_pgs(cls, summary_list):
- bad_pg_words = ['pgs', 'stuck', 'inactive']
- stuck_pgs = 0
- for summary_data in summary_list:
- if summary_data.get('severity') != 'HEALTH_ERR':
- continue
- if all(trigger in summary_data.get('summary')
- for trigger in bad_pg_words):
- stuck_pgs = int(summary_data.get('summary').split()[0])
-
- return stuck_pgs
-
- def _mon_health_new(self):
-
- cluster, health_data = self._mon_health_common()
-
- mon_status_output = self._mon_command('mon_status')
- quorum_list = mon_status_output.get('quorum')
- mon_list = mon_status_output.get('monmap').get('mons')
- mon_status = {}
- for mon in mon_list:
- state = 0 if mon.get('rank') in quorum_list else 4
- mon_status[mon.get('name')] = state
-
- cluster['mon_status'] = mon_status
-
- return cluster
-
- def _mon_health_common(self):
-
- # for v12 (Luminous and beyond) add the following setting to
- # ceph.conf "mon_health_preluminous_compat=true"
- # this will provide the same output as pre-luminous
-
- cluster_data = self._admin_socket().get('cluster')
- pg_data = self._mon_command("pg stat")
- health_data = self._mon_command("health")
- health_text = health_data.get('overall_status',
- health_data.get('status', ''))
-
- cluster = {Mon.cluster_metrics[k][0]: cluster_data[k]
- for k in cluster_data}
-
- health_num = Mon.health.get(health_text, 16)
-
- cluster['health'] = health_num
-
- pg_states = pg_data.get('num_pg_by_state') # list of dict name,num
- health_summary = health_data.get('summary', []) # list of issues
- cluster['num_pgs_stuck'] = Mon.check_stuck_pgs(health_summary)
- cluster['features'] = Mon.get_feature_state(health_summary,
- pg_states)
-
- self.logger.debug(
- 'Features:{}'.format(json.dumps(cluster['features'])))
-
- return cluster, health_data
-
- def _mon_health(self):
-
- cluster, health_data = self._mon_health_common()
-
- services = health_data.get('health').get('health_services')
- monstats = {}
- for svc in services:
- if 'mons' in svc:
- # Each monitor will have a numeric value denoting health
- monstats = { mon.get('name'): Mon.health.get(mon.get('health'))
- for mon in svc.get('mons')}
-
- cluster['mon_status'] = monstats
-
- return cluster
-
- @classmethod
- def _seed(cls, metrics):
- return {metrics[key][0]: 0 for key in metrics}
-
- def display_names(self, metric_format, metrics):
- """
- convert the keys to the static descriptions
- :return:
- """
- return {metric_format[k][0]: metrics[k]
- for k in metrics} if metrics else {}
-
- def _get_pool_stats(self):
- """ get pool stats from rados """
-
- raw_stats = self._mon_command('osd pool stats')
- pool_stats = {}
-
- # process each pool
- for pool in raw_stats:
-
- pool_name = pool['pool_name'].replace('.', '_')
- client_io = self.display_names(Mon.pool_client_metrics,
- pool.get('client_io_rate'))
- recovery = self.display_names(Mon.pool_recovery_metrics,
- pool.get('recovery_rate'))
-
- pool_md = {}
- if client_io:
-
- # Add pool level aggregation
- client_io['bytes_sec'] = client_io.get('read_bytes_sec', 0) + \
- client_io.get('write_bytes_sec', 0)
- client_io["op_per_sec"] = client_io.get('read_op_per_sec', 0)+ \
- client_io.get('write_op_per_sec', 0)
- pool_md = client_io
-
- else:
- pool_md = Mon._seed(Mon.pool_client_metrics)
-
- if recovery:
- pool_md = common.merge_dicts(pool_md, recovery)
- else:
- pool_md = common.merge_dicts(pool_md, Mon._seed(
- Mon.pool_recovery_metrics))
-
- pool_stats[pool_name] = pool_md
-
- return pool_stats
-
- def _get_osd_states(self):
-
- self.logger.debug("fetching osd states from the local mon")
- raw = self._mon_command('osd dump')
- osd_hosts = set()
- osds = {}
- for osd in raw.get('osds'):
- cluster_addr = osd.get('cluster_addr').split(':')[0]
- osd_hosts.add(cluster_addr)
-
- # NB. The key for the osds dict must be a string as the dict is
- # flattened when the metric name is derived in the parent collectd
- # module. If it is not converted, you get a TypeError
- osds[str(osd.get('osd'))] = {"up": osd.get('up'),
- "in": osd.get('in')}
-
- return len(osd_hosts), osds
-
- @staticmethod
- def _select_pools(pools, mons):
- """
- determine the pools this mon should scan based on it's name. We select
- pools from the an offset into the pool list, and then repeat at an
- interval set by # mons in the configuration. This splits up the pools
- we have, so each mon looks at a discrete set of pools instead of all
- mons performing all scans.
- :param pools: (list) rados pool names
- :param mons: (list) monitor names from ceph health
- :return: (list) of pools this monitor should scan. empty list if the
- monitor name mismatches - so no scans done
- """
-
- pools_to_scan = []
-
- try:
- freq = mons.index(common.get_hostname())
- except ValueError:
- # this host's name is not in the monitor list?
- # twilight zone moment
- pass
- else:
-
- pools_to_scan = [pools[ptr]
- for ptr in xrange(freq, len(pools), len(mons))]
-
- return pools_to_scan
-
- def get_pools(self):
- skip_pools = ('default.rgw')
-
- start = time.time()
- conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
- with rados.Rados(conffile=conf_file) as cluster:
- rados_pools = sorted(cluster.list_pools())
- end = time.time()
-
- self.logger.debug('lspools took {:.3f}s'.format(end - start))
-
- filtered_pools = [pool for pool in rados_pools
- if not pool.startswith(skip_pools)]
-
- return filtered_pools
-
- def _get_rbds(self, monitors):
-
- pool_list = self.get_pools()
- mon_list = sorted(monitors.keys())
- my_pools = Mon._select_pools(pool_list, mon_list)
- self.logger.debug("Pools to be scanned on this mon"
- " : {}".format(','.join(my_pools)))
- threads = []
-
- start = time.time()
-
- for pool in my_pools:
- thread = RBDScanner(self.cluster_name, pool)
- thread.start()
- threads.append(thread)
-
- # wait for all threads to complete
- for thread in threads:
- thread.join(1)
-
- end = time.time()
- self.logger.debug("rbd scans {:.3f}s".format((end - start)))
-
- total_rbds = sum([thread.num_rbds for thread in threads])
- self.logger.debug("total rbds found : {}".format(total_rbds))
-
- for thread in threads:
- del thread
-
- return total_rbds
-
- def get_stats(self):
- """
- method associated with the plugin callback to gather the metrics
- :return: (dict) metadata describing the state of the mon/osd's
- """
-
- start = time.time()
-
- pool_stats = self._get_pool_stats()
- num_osd_hosts, osd_states = self._get_osd_states()
- cluster_state = self.get_mon_health()
- cluster_state['num_osd_hosts'] = num_osd_hosts
- cluster_state['num_rbds'] = self._get_rbds(cluster_state['mon_status'])
-
- all_stats = common.merge_dicts(cluster_state, {"pools": pool_stats,
- "osd_state": osd_states})
-
- end = time.time()
- self.logger.info("mon get_stats call : {:.3f}s".format((end - start)))
-
- return {"mon": all_stats}
-
+++ /dev/null
-#!/usr/bin/env python
-
-import os
-import time
-import math
-
-from cephmetrics.collectors import (base, common)
-
-__author__ = "Paul Cuzner"
-
-
-class OSDstats(object):
-
- osd_capacity = {
- "stat_bytes": ("stat_bytes", "gauge"),
- "stat_bytes_used": ("stat_bytes_used", "gauge"),
- "stat_bytes_avail": ("stat_bytes_avail", "gauge")
- }
-
- filestore_metrics = {
- "journal_latency",
- "commitcycle_latency",
- "apply_latency",
- "queue_transaction_latency_avg"
- }
-
- def __init__(self, osd_type='filestore'):
- self._current = {}
- self._previous = {}
- self._osd_type = osd_type
- self.osd_percent_used = 0
-
- def update(self, stats):
- """
- update the objects attributes based on the dict
- :param stats: (dict) containing filestore performance ('filestore')
- and capacity info ('osd')
- :return: None
- """
-
- if self._current:
- self._previous = self._current
- self._current = stats['filestore']
- else:
- self._current = stats['filestore']
-
- for attr in OSDstats.filestore_metrics:
-
- if self._previous:
- d_sum = self._current[attr].get('sum') - \
- self._previous[attr].get('sum')
- d_avgcount = self._current[attr].get('avgcount') - \
- self._previous[attr].get('avgcount')
-
- if d_sum == 0 or d_avgcount == 0:
- val = 0
- else:
- val = float(d_sum) / d_avgcount
- else:
- # no previous value, so set to 0
- val = 0
-
- setattr(self, attr, val)
-
- for attr in stats['osd']:
- setattr(self, attr, stats['osd'].get(attr))
-
- self.osd_percent_used = math.ceil((float(self.stat_bytes_used) /
- self.stat_bytes) * 100)
-
-
-class OSDs(base.BaseCollector):
-
- all_metrics = common.merge_dicts(
- common.Disk.metrics, common.IOstat.metrics)
-
- def __init__(self, cluster_name, **kwargs):
- base.BaseCollector.__init__(self, cluster_name, **kwargs)
- self.timestamp = int(time.time())
-
- self.osd = {} # dict of disk objects, each disk contains osd_id
- self.jrnl = {} # dict of journal devices (if not collocated)
- self.osd_id_list = []
- self.dev_lookup = {} # dict dev_name -> osd | jrnl
- self.osd_count = 0
-
- def __repr__(self):
-
- s = ''
- for disk in self.osd:
- s += "{}\n".format(disk)
- dev = self.osd[disk]
-
- for var in vars(dev):
- if not var.startswith('_'):
- s += "{} ... {}\n".format(var, getattr(dev, var))
- return s
-
- def _fetch_osd_stats(self, osd_id):
-
- # NB: osd stats are cumulative
-
- stats = {}
- osd_socket_name = '/var/run/ceph/{}-osd.{}.asok'.format(
- self.cluster_name, osd_id)
-
- if not os.path.exists(osd_socket_name):
- # all OSD's should expose an admin socket, so if it's missing
- # this node has a problem!
- raise IOError("Socket file missing for OSD {}".format(osd_id))
-
- self.logger.debug("fetching osd stats for osd {}".format(osd_id))
- resp = self._admin_socket(socket_path=osd_socket_name)
-
- filestore_stats = resp.get('filestore')
- stats['filestore'] = {key_name: filestore_stats.get(key_name)
- for key_name in OSDstats.filestore_metrics}
-
- osd_stats = resp.get('osd')
-
- # Add disk usage stats
- stats['osd'] = {key_name: osd_stats.get(key_name)
- for key_name in OSDstats.osd_capacity.keys()}
-
- return stats
-
- @staticmethod
- def get_osd_type(osd_path):
-
- osd_type_fname = os.path.join(osd_path, 'type')
- if os.path.exists(osd_type_fname):
- return common.fread(osd_type_fname)
- else:
- if os.path.exists(os.path.join(osd_path, 'journal')):
- return "filestore"
- else:
- raise ValueError("Unrecognised OSD type")
-
- def _dev_to_osd(self):
- """
- Look at the system to determine which disks are acting as OSD's
- """
-
- # the logic here uses the mount points to determine which OSD's are
- # in the system. The encryption state is determine just by the use
- # devicemapper (i.e. /dev/mapper prefixed devices) - since at this time
- # this is all dm is used for.
-
- osd_indicators = {'var', 'lib', 'osd'}
-
- for mnt in common.freadlines('/proc/mounts'):
- items = mnt.split(' ')
- dev_path, path_name = items[:2]
- if path_name.startswith('/var/lib'):
- # take a close look since this is where ceph osds usually
- # get mounted
- dirs = set(path_name.split('/'))
- if dirs.issuperset(osd_indicators):
-
- # get the osd_id from the name is the most simple way
- # to get the id, due to naming conventions. If this fails
- # though, plan 'b' is the whoami file
- osd_id = path_name.split('-')[-1]
- if not osd_id.isdigit():
- osd_id = common.fread(
- os.path.join(path_name, 'whoami'))
-
- if osd_id not in self.osd:
- osd_type = OSDs.get_osd_type(path_name)
- self.osd[osd_id] = OSDstats(osd_type=osd_type)
- self.osd_id_list.append(osd_id)
-
- osd_type = self.osd[osd_id]._osd_type
- if osd_type == 'filestore':
- if dev_path.startswith('/dev/mapper'):
- encrypted = 1
- uuid = dev_path.split('/')[-1]
- partuuid = '/dev/disk/by-partuuid/{}'.format(
- uuid)
- dev_path = os.path.realpath(partuuid)
- osd_device = dev_path.split('/')[-1]
- else:
- encrypted = 0
- osd_device = dev_path.split('/')[-1]
-
- elif osd_type == 'bluestore':
- block_link = os.path.join(path_name, 'block')
- osd_path = os.path.realpath(block_link)
- osd_device = osd_path.split('/')[-1]
- encrypted = 0
- else:
- raise ValueError("Unknown OSD type encountered")
-
- # if the osd_id hasn't been seem neither has the
- # disk
- self.osd[osd_device] = common.Disk(
- osd_device,
- path_name=path_name,
- osd_id=osd_id,
- in_osd_type=osd_type,
- encrypted=encrypted)
- self.dev_lookup[osd_device] = 'osd'
- self.osd_count += 1
-
- if osd_type == 'filestore':
- journal_link = os.path.join(path_name, 'journal')
- else:
- journal_link = os.path.join(path_name, 'block.wal')
-
- if os.path.exists(journal_link):
- link_tgt = os.readlink(journal_link)
- if link_tgt.startswith('/dev/mapper'):
- encrypted = 1
- else:
- encrypted = 0
-
- partuuid_path = os.path.join(
- '/dev/disk/by-partuuid',
- link_tgt.split('/')[-1])
- jrnl_path = os.path.realpath(partuuid_path)
- jrnl_dev = jrnl_path.split('/')[-1]
-
- if jrnl_dev not in self.osd:
- self.jrnl[jrnl_dev] = common.Disk(
- jrnl_dev,
- osd_id=osd_id,
- in_osd_type=osd_type,
- encrypted=encrypted)
-
- self.dev_lookup[jrnl_dev] = 'jrnl'
-
- else:
- # No journal or WAL link..?
- pass
-
- def _stats_lookup(self):
- """
- Grab the disk stats from /proc/diskstats, and the key osd perf dump
- counters
- """
-
- now = time.time()
- interval = int(now) - self.timestamp
- self.timestamp = int(now)
-
- # Fetch diskstats from the OS
- for perf_entry in common.freadlines('/proc/diskstats'):
-
- field = perf_entry.split()
- dev_name = field[2]
-
- device = None
- if self.dev_lookup.get(dev_name, None) == 'osd':
- device = self.osd[dev_name]
- elif self.dev_lookup.get(dev_name, None) == 'jrnl':
- device = self.jrnl[dev_name]
-
- if device:
- new_stats = field[3:]
-
- if device.perf._current:
- device.perf._previous = device.perf._current
- device.perf._current = new_stats
- else:
- device.perf._current = new_stats
-
- device.perf.compute(interval)
- device.refresh()
-
- end = time.time()
- self.logger.debug("OS disk stats calculated in "
- "{:.4f}s".format(end-now))
-
- # fetch stats from each osd daemon
- osd_stats_start = time.time()
- for osd_id in self.osd_id_list:
-
- if self.osd[osd_id]._osd_type == 'filestore':
- osd_stats = self._fetch_osd_stats(osd_id)
-
- # self.logger.debug('stats : {}'.format(osd_stats))
-
- osd_device = self.osd[osd_id]
- osd_device.update(osd_stats)
- else:
- self.logger.debug("skipped 'bluestore' osd perf collection "
- "for osd.{}".format(osd_id))
-
- osd_stats_end = time.time()
- self.logger.debug(
- "OSD perf dump stats collected for {} OSDs in {:.3f}s".format(
- len(self.osd_id_list), (osd_stats_end - osd_stats_start)))
-
- @staticmethod
- def _dump_devs(device_dict):
-
- dumped = {}
-
- for dev_name in sorted(device_dict):
- device = device_dict[dev_name]
- dumped[dev_name] = common.todict(device)
-
- return dumped
-
- def dump(self):
- """
- dump the osd object(s) to a dict. The object *must* not have references
- to other objects - if this rule is broken cephmetrics caller will fail
- when parsing the dict
-
- :return: (dict) dictionary representation of this OSDs on this host
- """
-
- return {
- "num_osds": self.osd_count,
- "osd": OSDs._dump_devs(self.osd),
- "jrnl": OSDs._dump_devs(self.jrnl)
- }
-
- def get_stats(self):
-
- start = time.time()
-
- self._dev_to_osd()
- self._stats_lookup()
-
- end = time.time()
-
- self.logger.info("osd get_stats call "
- ": {:.3f}s".format((end - start)))
-
- return self.dump()
+++ /dev/null
-#!/usr/bin/env python
-
-import time
-
-from cephmetrics.collectors import (base, common)
-
-__author__ = "paul.cuzner@redhat.com"
-
-
-class RGW(base.BaseCollector):
-
- simple_metrics = {
- "req": ("requests", "derive"),
- "failed_req": ("requests_failed", "derive"),
- "get": ("gets", "derive"),
- "get_b": ("get_bytes", "derive"),
- "put": ("puts", "derive"),
- "put_b": ("put_bytes", "derive"),
- "qlen": ("qlen", "derive"),
- "qactive": ("requests_active", "derive")
- }
-
- int_latencies = [
- "get_initial_lat",
- "put_initial_lat"
- ]
-
- latencies = {
- "get_initial_lat_sum": ("get_initial_lat_sum", "derive"),
- "get_initial_lat_avgcount": ("get_initial_lat_avgcount", "derive"),
- "put_initial_lat_sum": ("put_initial_lat_sum", "derive"),
- "put_initial_lat_avgcount": ("put_initial_lat_avgcount", "derive")
- }
-
- all_metrics = common.merge_dicts(simple_metrics, latencies)
-
- def __init__(self, cluster_name, admin_socket, **kwargs):
- base.BaseCollector.__init__(self, cluster_name, admin_socket, **kwargs)
- self.host_name = common.get_hostname()
-
- def _get_rgw_data(self):
-
- response = self._admin_socket()
-
- key_name = 'client.rgw.{}'.format(self.host_name)
-
- return response.get(key_name)
-
- def _filter(self, stats):
- # pick out the simple metrics
-
- filtered = {key: stats[key] for key in RGW.simple_metrics}
-
- for key in RGW.int_latencies:
- for _attr in stats[key]:
- new_key = "{}_{}".format(key, _attr)
- filtered[new_key] = stats[key].get(_attr)
-
- return filtered
-
- def get_stats(self):
-
- start = time.time()
-
- raw_stats = self._get_rgw_data()
-
- stats = self._filter(raw_stats)
-
- end = time.time()
-
- self.logger.info("RGW get_stats : {:.3f}s".format((end - start)))
-
- return {"rgw": stats}
--- /dev/null
+#!/usr/bin/env python
+import glob
+import os
+import sys
+
+# Remove collectd's plugin dir from sys.path to avoid namespace collisions with
+# .so files that are not actually Python modules
+sys.path = [item for item in sys.path if not item.endswith('/collectd')]
+
+from . import (common, iscsi, mon, osd, rgw)
+
+
+class Ceph(object):
+ def __init__(self):
+ self.cluster_name = None
+ self.host_name = common.get_hostname()
+
+ self.mon_socket = None
+ self.rgw_socket = None
+
+ self.mon = None
+ self.rgw = None
+ self.osd = None
+ self.iscsi = None
+
+ def probe(self):
+ """
+ set up which collector(s) to use, based on what types of sockets we
+ find in /var/run/ceph
+ """
+
+ mon_socket = '/var/run/ceph/{}-mon.{}.asok'.format(self.cluster_name,
+ self.host_name)
+ if os.path.exists(mon_socket):
+ self.mon_socket = mon_socket
+ self.mon = mon.Mon(self.cluster_name, admin_socket=mon_socket)
+
+ rgw_socket_list = glob.glob('/var/run/ceph/{}-client.rgw.*.'
+ 'asok'.format(self.cluster_name))
+
+ if rgw_socket_list:
+ rgw_socket = rgw_socket_list[0]
+ self.rgw = rgw.RGW(self.cluster_name, admin_socket=rgw_socket)
+
+ osd_socket_list = glob.glob('/var/run/ceph/{}-osd.*'
+ '.asok'.format(self.cluster_name))
+ mounted = common.freadlines('/proc/mounts')
+ osds_mounted = [mnt for mnt in mounted
+ if mnt.split()[1].startswith('/var/lib/ceph')]
+ if osd_socket_list or osds_mounted:
+ self.osd = osd.OSDs(self.cluster_name)
+
+ if os.path.exists('/sys/kernel/config/target/iscsi'):
+ self.iscsi = iscsi.ISCSIGateway(self.cluster_name)
--- /dev/null
+#!/usr/bin/env python
+
+import json
+import time
+import logging
+
+from ceph_daemon import admin_socket
+
+
+class BaseCollector(object):
+
+ def __init__(self, cluster_name, admin_socket=None):
+ self.cluster_name = cluster_name
+ self.admin_socket = admin_socket
+
+ self.logger = logging.getLogger('cephmetrics')
+
+ def _admin_socket(self, cmds=None, socket_path=None):
+
+ adm_socket = self.admin_socket if not socket_path else socket_path
+
+ if not cmds:
+ cmds = ['perf', 'dump']
+
+ start = time.time()
+ response = admin_socket(adm_socket, cmds,
+ format='json')
+ end = time.time()
+
+ self.logger.debug("admin_socket call '{}' : "
+ "{:.3f}s".format(' '.join(cmds),
+ (end - start)))
+
+ return json.loads(response)
+
+ def get_stats(self):
+
+ return {}
--- /dev/null
+#!/usr/bin/env python
+
+
+import socket
+from os import statvfs
+import math
+
+
+def get_hostname():
+ return socket.gethostname().split('.')[0]
+
+
+def add_dicts(dict1, dict2):
+ """
+ Add dictionary values together
+ :param dict1:
+ :param dict2:
+ :return: dict with matching fields sum'd together
+ """
+ return {key: dict1.get(key, 0) + dict2.get(key, 0)
+ for key in set(dict1).union(dict2)}
+
+
+def merge_dicts(dict1, dict2):
+ """
+ merges two dicts together to form a single dict. when dict keys overlap
+ the value in the 2nd dict takes precedence
+ :param dict1:
+ :param dict2:
+ :return: combined dict
+ """
+
+ new = dict1.copy()
+ new.update(dict2)
+
+ return new
+
+
+def flatten_dict(data, separator='.', prefix=''):
+ """
+ flatten a dict, so it is just simple key/value pairs
+ :param data: (dict)
+ :param separator: (str) char to use when combining keys
+ :param prefix: key prefix
+ :return:
+ """
+ return {prefix + separator + k if prefix else k: v
+ for kk, vv in data.items()
+ for k, v in flatten_dict(vv, separator, kk).items()
+ } if isinstance(data, dict) else {prefix: data}
+
+
+def todict(obj):
+ """
+ convert an object to a dict representation
+ :param obj: (object) object to examine, to extract variables/values from
+ :return: (dict) representation of the given object
+ """
+ data = {}
+ for key, value in obj.__dict__.iteritems():
+
+ if key.startswith('_'):
+ continue
+
+ try:
+ data[key] = todict(value)
+ except AttributeError:
+ data[key] = value
+
+ return data
+
+
+def fread(file_name=None):
+ """
+ Simple read function for files of a single value
+ :param file_name: (str) file name to read
+ :return: (str) contents of the file
+ """
+
+ with open(file_name, 'r') as f:
+ setting = f.read().rstrip()
+ return setting
+
+
+def freadlines(file_name=None):
+ """
+ simple readlines function to return all records of a given file
+ :param file_name: (str) file name to read
+ :return: (list) contents of the file
+ """
+
+ with open(file_name, 'r') as f:
+ data = f.readlines()
+ return data
+
+
+class IOstat(object):
+ raw_metrics = [
+ "_reads",
+ "_reads_mrgd",
+ "_sectors_read",
+ "_read_ms",
+ "_writes",
+ "_writes_mrgd",
+ "_sectors_written",
+ "_write_ms",
+ "_current_io",
+ "_ms_active_io",
+ "_ms_active_io_w"
+ ]
+
+ sector_size = 512
+
+ metrics = {
+ "iops": ("iops", "gauge"),
+ "r_iops": ("r_iops", "gauge"),
+ "w_iops": ("w_iops", "gauge"),
+ "bytes_per_sec": ("bytes_per_sec", "gauge"),
+ "r_bytes_per_sec": ("r_bytes_per_sec", "gauge"),
+ "w_bytes_per_sec": ("w_bytes_per_sec", "gauge"),
+ "util": ("util", "gauge"),
+ "await": ("await", "gauge"),
+ "r_await": ("r_await", "gauge"),
+ "w_await": ("w_await", "gauge"),
+ }
+
+ def __init__(self):
+ self._previous = []
+ self._current = []
+
+ # Seed the metrics we're interested in
+ for ctr in IOstat.metrics.keys():
+ setattr(self, ctr, 0)
+
+ def __str__(self):
+ s = '\n- IOstat object:\n'
+ for key in sorted(vars(self)):
+ s += '\t{} ... {}\n'.format(key, getattr(self, key))
+ return s
+
+ def _calc_raw_delta(self):
+ if not self._previous:
+ # nothing to compute yet
+ for ptr in range(len(IOstat.raw_metrics)):
+ key = IOstat.raw_metrics[ptr]
+ setattr(self, key, 0)
+ else:
+ for ptr in range(len(IOstat.raw_metrics)):
+ key = IOstat.raw_metrics[ptr]
+ setattr(self, key, (int(self._current[ptr]) -
+ int(self._previous[ptr])))
+
+ def compute(self, sample_interval):
+ """
+ Calculate the iostats for this device
+ """
+
+ self._calc_raw_delta()
+
+ if sample_interval > 0:
+ interval_ms = sample_interval * 1000
+ total_io = self._reads + self._writes
+ self.util = float(self._ms_active_io) / interval_ms * 100
+ self.iops = int(total_io) / sample_interval
+ self.r_iops = int(self._reads) / sample_interval
+ self.w_iops = int(self._writes) / sample_interval
+ self.await = (float(self._write_ms + self._read_ms) / total_io if
+ total_io > 0 else 0)
+ self.w_await = float(
+ self._write_ms) / self._writes if self._writes > 0 else 0
+ self.r_await = float(
+ self._read_ms) / self._reads if self._reads > 0 else 0
+ self.r_bytes_per_sec = (float(
+ self._sectors_read * IOstat.sector_size)) / sample_interval
+ self.w_bytes_per_sec = (float(
+ self._sectors_written * IOstat.sector_size)) / sample_interval
+ self.bytes_per_sec = self.r_bytes_per_sec + self.w_bytes_per_sec
+
+
+class Disk(object):
+
+ metrics = {
+ "rotational": ("rotational", "gauge"),
+ "disk_size": ("disk_size", "gauge"),
+ "fs_size": ("fs_size", "gauge"),
+ "fs_used": ("fs_used", "gauge"),
+ "fs_percent_used": ("fs_percent_used", "gauge"),
+ "osd_id": ("osd_id", "gauge")
+ }
+
+ osd_types = {"filestore": 0, "bluestore": 1}
+
+ def __init__(self, device_name, path_name=None, osd_id=None,
+ in_osd_type="filestore", encrypted=0):
+
+ self._name = device_name
+ self._path_name = path_name
+ self._base_dev = Disk.get_base_dev(device_name)
+ self.osd_id = osd_id
+
+ self.rotational = self._get_rota()
+ self.disk_size = self._get_size()
+ self.perf = IOstat()
+ self.fs_size = 0
+ self.fs_percent_used = 0
+ self.fs_used = 0
+ self.encrypted = encrypted
+ self.osd_type = Disk.osd_types[in_osd_type]
+
+ self.refresh()
+
+ def _get_size(self):
+ return int(fread("/sys/block/{}/size".format(self._base_dev))) * 512
+
+ def _get_rota(self):
+ return int(
+ fread("/sys/block/{}/queue/rotational".format(self._base_dev)))
+
+ def _get_fssize(self):
+ s = statvfs("{}/whoami".format(self._path_name))
+ fs_size = s.f_blocks * s.f_bsize
+ fs_used = fs_size - (s.f_bfree * s.f_bsize)
+ fs_percent_used = math.ceil((float(fs_used) / fs_size) * 100)
+ return fs_size, fs_used, fs_percent_used
+
+ def refresh(self):
+ # only run the fs size update, if the _path_name is set.
+ if self._path_name:
+ self.fs_size, self.fs_used, self.fs_percent_used = \
+ self._get_fssize()
+
+ @staticmethod
+ def get_base_dev(dev_name):
+
+ # for intelcas devices, just use the device name as is
+ if dev_name.startswith('intelcas'):
+ device = dev_name
+ elif dev_name.startswith('nvme'):
+ if 'p' in dev_name:
+ device = dev_name[:(dev_name.index('p'))]
+ else:
+ device = dev_name
+ else:
+ # default strip any numeric ie. sdaa1 -> sdaa
+ device = filter(lambda ch: ch.isalpha(), dev_name)
+
+ return device
--- /dev/null
+#!/usr/bin/env python2
+
+# requires python-rtslib_fb for LIO interaction
+#
+# NB. the rtslib_fb module is dynamically loaded by the ISCSIGateway
+# class instantiation. This prevents import errors within the generic parent
+# module cephmetrics
+#
+import os
+import sys
+import time
+
+from cephmetrics_collectors import (base, common)
+
+
+class Client(object):
+
+ def __init__(self, iqn):
+ self.iqn = iqn
+ self.name = iqn.replace('.', '-')
+ self.luns = {}
+ self.lun_count = 0
+ self._cycle = 0
+
+ def dump(self):
+ client_dump = {}
+ lun_info = {}
+ client_dump[self.name] = {"luns": {},
+ "lun_count": self.lun_count}
+ for lun_name in self.luns:
+ lun = self.luns[lun_name]
+ lun_info.update(lun.dump())
+
+ return {self.name: {"luns": lun_info,
+ "lun_count": len(lun_info)}
+ }
+
+
+class LUN(object):
+
+ def __init__(self, client, tpg_lun):
+ self._path = tpg_lun.storage_object.path
+ self._tpg_lun = tpg_lun
+ self._name = tpg_lun.storage_object.name
+ self._display_name = tpg_lun.storage_object.name.replace('.', "-")
+ self._so = tpg_lun.storage_object
+ self._client = client
+ self._cycle = 0
+ self.size = 0
+ self.iops = 0
+ self.read_bytes_per_sec = 0
+ self.write_bytes_per_sec = 0
+ self.total_bytes_per_sec = 0
+ self.active_path = 0
+
+ def refresh(self, cycle_id):
+ self._cycle = cycle_id
+ self.size = self._so.size
+ stats_path = os.path.join(self._path, 'statistics/scsi_lu')
+ self.iops = int(common.fread(os.path.join(stats_path, "num_cmds")))
+ read_mb = float(common.fread(os.path.join(stats_path, "read_mbytes")))
+ write_mb = float(
+ common.fread(os.path.join(stats_path, "write_mbytes")))
+ self.read_bytes_per_sec = int(read_mb * 1024 ** 2)
+ self.write_bytes_per_sec = int(write_mb * 1024 ** 2)
+ self.total_bytes_per_sec = self.read_bytes_per_sec + \
+ self.write_bytes_per_sec
+
+ if self._tpg_lun.alua_tg_pt_gp_name == 'ao':
+ self.active_path = 1
+ else:
+ self.active_path = 0
+
+ def dump(self):
+ return {self._display_name: {k: getattr(self, k) for k in self.__dict__
+ if not k.startswith("_")}}
+
+
+class ISCSIGateway(base.BaseCollector):
+ """
+ created on a host that has a /sys/kernel/config/target/iscsi dir
+ i.e. there is an iscsi gateway here!
+ """
+
+ metrics = {
+ "lun_count": ("lun_count", "gauge"),
+ "client_count": ("client_count", "gauge"),
+ "tpg_count": ("tpg_count", "gauge"),
+ "sessions": ("sessions", "gauge"),
+ "capacity": ("capacity", "gauge"),
+ "iops": ("iops", "derive"),
+ "read_bytes_per_sec": ("read_bytes_per_sec", "derive"),
+ "write_bytes_per_sec": ("write_bytes_per_sec", "derive"),
+ "total_bytes_per_sec": ("total_bytes_per_sec", "derive")
+ }
+
+ def __init__(self, *args, **kwargs):
+ base.BaseCollector.__init__(self, *args, **kwargs)
+
+ # Since the module can be imported by a parent class but not
+ # instantiated, the rtslib import is deferred until the first instance
+ # of the the class is created. This keeps the parent module simple
+ # and more importantly generic
+ if 'rtslib_fb.root' not in sys.modules.keys():
+
+ try:
+ import rtslib_fb.root as RTSRoot
+ except ImportError:
+ raise ImportError("rtslib_fb python package is missing")
+
+ self._root = RTSRoot()
+
+ self.clients = {}
+ self.cycle = 0
+
+ self.iops = 0
+ self.read_bytes_per_sec = 0
+ self.write_bytes_per_sec = 0
+ self.total_bytes_per_sec = 0
+
+ def refresh(self):
+ """
+ populate the instance by exploring rtslib
+ """
+
+ self.iops = 0
+ self.read_bytes_per_sec = 0
+ self.write_bytes_per_sec = 0
+ self.total_bytes_per_sec = 0
+
+ if self.cycle == 10:
+ self.cycle = 0
+ else:
+ self.cycle += 1
+
+ for node_acl in self._root.node_acls:
+
+ client_name = node_acl.node_wwn
+
+ if client_name not in self.clients:
+ new_client = Client(client_name)
+ self.clients[client_name] = new_client
+
+ client = self.clients[client_name]
+ client.lun_count = 0
+ client._cycle = self.cycle
+
+ for lun in node_acl.mapped_luns:
+ client.lun_count += 1
+ tpg_lun = lun.tpg_lun
+ lun_name = tpg_lun.storage_object.name
+ if lun_name not in client.luns:
+ lun = LUN(client, tpg_lun)
+ client.luns[lun._name] = lun
+ else:
+ lun = client.luns[lun_name]
+
+ lun.refresh(self.cycle)
+
+ self.iops += lun.iops
+ self.read_bytes_per_sec += lun.read_bytes_per_sec
+ self.write_bytes_per_sec += lun.write_bytes_per_sec
+ self.total_bytes_per_sec = self.read_bytes_per_sec + \
+ self.write_bytes_per_sec
+
+ def prune(self):
+ """
+ drop child objects held by the instance, that are no longer in the
+ iSCSI config i.e. don't report on old information
+ """
+
+ for client_name in self.clients:
+ client = self.clients[client_name]
+
+ for lun_name in client.luns:
+ lun = client.luns[lun_name]
+ if lun._cycle != self.cycle:
+ # drop the lun entry
+ self.logger.debug("pruning LUN '{}'".format(lun_name))
+
+ del client.luns[lun_name]
+
+ if client._cycle != self.cycle:
+ # drop the client entry
+ self.logger.debug("pruning client '{}'".format(client_name))
+ del self.clients[client_name]
+
+ def dump(self):
+
+ gw_stats = {}
+ client_stats = {}
+
+ for metric in ISCSIGateway.metrics:
+ gw_stats[metric] = getattr(self, metric)
+
+ for client_name in self.clients:
+ client = self.clients[client_name]
+ client_stats.update(client.dump())
+
+ return {"iscsi": {
+ "gw_name": {self.gateway_name: 0},
+ "gw_stats": gw_stats,
+ "gw_clients": client_stats
+ }
+ }
+
+ def _get_so(self):
+ return [so for so in self._root.storage_objects]
+
+ def _get_node_acls(self):
+ return [node for node in self._root.node_acls]
+
+ @property
+ def tpg_count(self):
+ return len([tpg for tpg in self._root.tpgs])
+
+ @property
+ def lun_count(self):
+ return len(self._get_so())
+
+ @property
+ def sessions(self):
+ return len([session for session in self._root.sessions])
+
+ @property
+ def gateway_name(self):
+ # Only the 1st gateway is considered/supported
+ gw_iqn = [gw.wwn for gw in self._root.targets][0]
+ return gw_iqn.replace('.', '-')
+
+ @property
+ def client_count(self):
+ return len(self._get_node_acls())
+
+ @property
+ def capacity(self):
+ return sum([so.size for so in self._get_so()])
+
+ def get_stats(self):
+
+ start = time.time()
+
+ # populate gateway instance with the latest configuration from rtslib
+ self.refresh()
+
+ # Overtime they'll be churn in client and disks so we need to drop
+ # any entries from prior runs that are no longer seen in the iscsi
+ # configuration with the prune method
+ self.prune()
+
+ end = time.time()
+
+ self.logger.info("LIO stats took {}s".format(end - start))
+
+ return self.dump()
--- /dev/null
+#!/usr/bin/env python
+
+import rados
+import rbd
+import json
+import threading
+import time
+import logging
+
+from cephmetrics_collectors import (base, common)
+
+
+class RBDScanner(threading.Thread):
+
+ def __init__(self, cluster_name, pool_name):
+ self.cluster_name = cluster_name
+ self.pool_name = pool_name
+ self.num_rbds = 0
+ self.logger = logging.getLogger('cephmetrics')
+
+ threading.Thread.__init__(self)
+
+ def run(self):
+ rbd_images = []
+ conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
+ self.logger.debug("scan of '{}' starting".format(self.pool_name))
+ with rados.Rados(conffile=conf_file) as cluster:
+ with cluster.open_ioctx(self.pool_name) as ioctx:
+ rbd_inst = rbd.RBD()
+ self.logger.debug("listing rbd's in {}".format(self.pool_name))
+ rbd_images = rbd_inst.list(ioctx)
+
+ self.logger.info("pool scan complete for '{}'".format(self.pool_name))
+ self.num_rbds = len(rbd_images)
+
+
+class Mon(base.BaseCollector):
+
+ health = {
+ "HEALTH_OK": 0,
+ "HEALTH_WARN": 4,
+ "HEALTH_ERR": 8
+ }
+
+ osd_state = {
+ "up": 0,
+ "down": 1
+ }
+
+ # metrics are declared, where each element has a description and collectd
+ # data type. The description is used to ensure the names sent by collectd
+ # remain the same even if the source name changes in ceph.
+ cluster_metrics = {
+ "num_mon": ("num_mon", "gauge"),
+ "num_mon_quorum": ("num_mon_quorum", "gauge"),
+ "num_rbds": ("num_rbds", "gauge"),
+ "num_osd_hosts": ("num_osd_hosts", "gauge"),
+ "num_osd": ("num_osd", "gauge"),
+ "num_osd_up": ("num_osd_up", "gauge"),
+ "num_osd_in": ("num_osd_in", "gauge"),
+ "osd_epoch": ("osd_epoch", "gauge"),
+ "osd_bytes": ("osd_bytes", "gauge"),
+ "osd_bytes_used": ("osd_bytes_used", "gauge"),
+ "osd_bytes_avail": ("osd_bytes_avail", "gauge"),
+ "num_pool": ("num_pool", "gauge"),
+ "num_pg": ("num_pg", "gauge"),
+ "num_pg_active_clean": ("num_pg_active_clean", "gauge"),
+ "num_pg_active": ("num_pg_active", "gauge"),
+ "num_pg_peering": ("num_pg_peering", "gauge"),
+ "num_object": ("num_object", "gauge"),
+ "num_object_degraded": ("num_object_degraded", "gauge"),
+ "num_object_misplaced": ("num_object_misplaced", "gauge"),
+ "num_object_unfound": ("num_object_unfound", "gauge"),
+ "num_bytes": ("num_bytes", "gauge"),
+ "num_mds_up": ("num_mds_up", "gauge"),
+ "num_mds_in": ("num_mds_in", "gauge"),
+ "num_mds_failed": ("num_mds_failed", "gauge"),
+ "mds_epoch": ("mds_epoch", "gauge"),
+ "health": ("health", "gauge")
+ }
+
+ pool_client_metrics = {
+ 'bytes_sec': ("bytes_sec", "gauge"),
+ 'op_per_sec': ("op_per_sec", "gauge"),
+ 'read_bytes_sec': ("read_bytes_sec", "gauge"),
+ 'write_op_per_sec': ("write_op_per_sec", "gauge"),
+ 'write_bytes_sec': ("write_bytes_sec", "gauge"),
+ 'read_op_per_sec': ("read_op_per_sec", "gauge")
+ }
+
+ pool_recovery_metrics = {
+ "recovering_objects_per_sec": ("recovering_objects_per_sec", "gauge"),
+ "recovering_bytes_per_sec": ("recovering_bytes_per_sec", "gauge"),
+ "recovering_keys_per_sec": ("recovering_keys_per_sec", "gauge"),
+ "num_objects_recovered": ("num_objects_recovered", "gauge"),
+ "num_bytes_recovered": ("num_bytes_recovered", "gauge"),
+ "num_keys_recovered": ("num_keys_recovered", "gauge")
+ }
+
+ osd_metrics = {
+ "status": ("status", "gauge")
+ }
+
+ mon_states = {
+ "mon_status": ("mon_status", "gauge")
+ }
+
+ all_metrics = common.merge_dicts(
+ pool_recovery_metrics, pool_client_metrics)
+ all_metrics = common.merge_dicts(all_metrics, cluster_metrics)
+ all_metrics = common.merge_dicts(all_metrics, osd_metrics)
+ all_metrics = common.merge_dicts(all_metrics, mon_states)
+
+ def __init__(self, *args, **kwargs):
+ base.BaseCollector.__init__(self, *args, **kwargs)
+ self.version = self._get_version()
+ if self.version < 12:
+ self.get_mon_health = self._mon_health
+ else:
+ self.get_mon_health = self._mon_health_new
+
+ def _get_version(self):
+ vers_info = self._mon_command('version')
+ return int(vers_info['version'].replace('.', ' ').split()[2])
+
+ def _mon_command(self, cmd_request):
+ """ Issue a command to the monitor """
+
+ buf_s = '{}'
+ conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
+
+ start = time.time()
+ with rados.Rados(conffile=conf_file) as cluster:
+ cmd = {'prefix': cmd_request, 'format': 'json'}
+ rc, buf_s, out = cluster.mon_command(json.dumps(cmd), b'')
+ end = time.time()
+
+ self.logger.debug("_mon_command call '{}' :"
+ " {:.3f}s".format(cmd_request,
+ (end - start)))
+
+ return json.loads(buf_s)
+
+ @staticmethod
+ def get_feature_state(summary_data, pg_states):
+ """
+ Look at the summary list to determine the state of RADOS features
+ :param summary_data: (list) summary data from a ceph health command
+ :return: (dict) dict indexed by feature
+ 0 Inactive, 1 Active, 2 Disabled
+ """
+ feature_lookup = {"noscrub": "scrub",
+ "nodeep-scrub": "deep_scrub",
+ "norecover": "recovery",
+ "nobackfill": "backfill",
+ "norebalance": "rebalance",
+ "noout": "out",
+ "nodown": "down"}
+
+ # Start with all features inactive i.e. enabled
+ feature_state = {feature_lookup.get(key): 0 for key in feature_lookup}
+
+ for summary in summary_data:
+ summary_desc = summary.get('summary')
+ if "flag(s) set" in summary_desc:
+ flags = summary_desc.replace(' flag(s) set', '').split(',')
+ for disabled_feature in flags:
+ if disabled_feature in feature_lookup:
+ feature = feature_lookup.get(disabled_feature)
+ feature_state[feature] = 2 # feature disabled
+
+ # Now use the current pg state names to determine whether a feature is
+ # active - if not it stays set to '0', which means inactive
+ pg_state_names = [pg_state.get('name') for pg_state in pg_states]
+ for pg_state in pg_state_names:
+ states = pg_state.split('+')
+ if 'recovering' in states:
+ feature_state['recovery'] = 1 # Active
+ continue
+ if 'backfilling' in states:
+ feature_state['backfill'] = 1
+ continue
+ if 'deep' in states:
+ feature_state['deep_scrub'] = 1
+ continue
+ if 'scrubbing' in states:
+ feature_state['scrub'] = 1
+
+ return feature_state
+
+ @classmethod
+ def check_stuck_pgs(cls, summary_list):
+ bad_pg_words = ['pgs', 'stuck', 'inactive']
+ stuck_pgs = 0
+ for summary_data in summary_list:
+ if summary_data.get('severity') != 'HEALTH_ERR':
+ continue
+ if all(trigger in summary_data.get('summary')
+ for trigger in bad_pg_words):
+ stuck_pgs = int(summary_data.get('summary').split()[0])
+
+ return stuck_pgs
+
+ def _mon_health_new(self):
+
+ cluster, health_data = self._mon_health_common()
+
+ mon_status_output = self._mon_command('mon_status')
+ quorum_list = mon_status_output.get('quorum')
+ mon_list = mon_status_output.get('monmap').get('mons')
+ mon_status = {}
+ for mon in mon_list:
+ state = 0 if mon.get('rank') in quorum_list else 4
+ mon_status[mon.get('name')] = state
+
+ cluster['mon_status'] = mon_status
+
+ return cluster
+
+ def _mon_health_common(self):
+
+ # for v12 (Luminous and beyond) add the following setting to
+ # ceph.conf "mon_health_preluminous_compat=true"
+ # this will provide the same output as pre-luminous
+
+ cluster_data = self._admin_socket().get('cluster')
+ pg_data = self._mon_command("pg stat")
+ health_data = self._mon_command("health")
+ health_text = health_data.get('overall_status',
+ health_data.get('status', ''))
+
+ cluster = {Mon.cluster_metrics[k][0]: cluster_data[k]
+ for k in cluster_data}
+
+ health_num = Mon.health.get(health_text, 16)
+
+ cluster['health'] = health_num
+
+ pg_states = pg_data.get('num_pg_by_state') # list of dict name,num
+ health_summary = health_data.get('summary', []) # list of issues
+ cluster['num_pgs_stuck'] = Mon.check_stuck_pgs(health_summary)
+ cluster['features'] = Mon.get_feature_state(health_summary,
+ pg_states)
+
+ self.logger.debug(
+ 'Features:{}'.format(json.dumps(cluster['features'])))
+
+ return cluster, health_data
+
+ def _mon_health(self):
+
+ cluster, health_data = self._mon_health_common()
+
+ services = health_data.get('health').get('health_services')
+ monstats = {}
+ for svc in services:
+ if 'mons' in svc:
+ # Each monitor will have a numeric value denoting health
+ monstats = { mon.get('name'): Mon.health.get(mon.get('health'))
+ for mon in svc.get('mons')}
+
+ cluster['mon_status'] = monstats
+
+ return cluster
+
+ @classmethod
+ def _seed(cls, metrics):
+ return {metrics[key][0]: 0 for key in metrics}
+
+ def display_names(self, metric_format, metrics):
+ """
+ convert the keys to the static descriptions
+ :return:
+ """
+ return {metric_format[k][0]: metrics[k]
+ for k in metrics} if metrics else {}
+
+ def _get_pool_stats(self):
+ """ get pool stats from rados """
+
+ raw_stats = self._mon_command('osd pool stats')
+ pool_stats = {}
+
+ # process each pool
+ for pool in raw_stats:
+
+ pool_name = pool['pool_name'].replace('.', '_')
+ client_io = self.display_names(Mon.pool_client_metrics,
+ pool.get('client_io_rate'))
+ recovery = self.display_names(Mon.pool_recovery_metrics,
+ pool.get('recovery_rate'))
+
+ pool_md = {}
+ if client_io:
+
+ # Add pool level aggregation
+ client_io['bytes_sec'] = client_io.get('read_bytes_sec', 0) + \
+ client_io.get('write_bytes_sec', 0)
+ client_io["op_per_sec"] = client_io.get('read_op_per_sec', 0)+ \
+ client_io.get('write_op_per_sec', 0)
+ pool_md = client_io
+
+ else:
+ pool_md = Mon._seed(Mon.pool_client_metrics)
+
+ if recovery:
+ pool_md = common.merge_dicts(pool_md, recovery)
+ else:
+ pool_md = common.merge_dicts(pool_md, Mon._seed(
+ Mon.pool_recovery_metrics))
+
+ pool_stats[pool_name] = pool_md
+
+ return pool_stats
+
+ def _get_osd_states(self):
+
+ self.logger.debug("fetching osd states from the local mon")
+ raw = self._mon_command('osd dump')
+ osd_hosts = set()
+ osds = {}
+ for osd in raw.get('osds'):
+ cluster_addr = osd.get('cluster_addr').split(':')[0]
+ osd_hosts.add(cluster_addr)
+
+ # NB. The key for the osds dict must be a string as the dict is
+ # flattened when the metric name is derived in the parent collectd
+ # module. If it is not converted, you get a TypeError
+ osds[str(osd.get('osd'))] = {"up": osd.get('up'),
+ "in": osd.get('in')}
+
+ return len(osd_hosts), osds
+
+ @staticmethod
+ def _select_pools(pools, mons):
+ """
+ determine the pools this mon should scan based on it's name. We select
+ pools from the an offset into the pool list, and then repeat at an
+ interval set by # mons in the configuration. This splits up the pools
+ we have, so each mon looks at a discrete set of pools instead of all
+ mons performing all scans.
+ :param pools: (list) rados pool names
+ :param mons: (list) monitor names from ceph health
+ :return: (list) of pools this monitor should scan. empty list if the
+ monitor name mismatches - so no scans done
+ """
+
+ pools_to_scan = []
+
+ try:
+ freq = mons.index(common.get_hostname())
+ except ValueError:
+ # this host's name is not in the monitor list?
+ # twilight zone moment
+ pass
+ else:
+
+ pools_to_scan = [pools[ptr]
+ for ptr in xrange(freq, len(pools), len(mons))]
+
+ return pools_to_scan
+
+ def get_pools(self):
+ skip_pools = ('default.rgw')
+
+ start = time.time()
+ conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
+ with rados.Rados(conffile=conf_file) as cluster:
+ rados_pools = sorted(cluster.list_pools())
+ end = time.time()
+
+ self.logger.debug('lspools took {:.3f}s'.format(end - start))
+
+ filtered_pools = [pool for pool in rados_pools
+ if not pool.startswith(skip_pools)]
+
+ return filtered_pools
+
+ def _get_rbds(self, monitors):
+
+ pool_list = self.get_pools()
+ mon_list = sorted(monitors.keys())
+ my_pools = Mon._select_pools(pool_list, mon_list)
+ self.logger.debug("Pools to be scanned on this mon"
+ " : {}".format(','.join(my_pools)))
+ threads = []
+
+ start = time.time()
+
+ for pool in my_pools:
+ thread = RBDScanner(self.cluster_name, pool)
+ thread.start()
+ threads.append(thread)
+
+ # wait for all threads to complete
+ for thread in threads:
+ thread.join(1)
+
+ end = time.time()
+ self.logger.debug("rbd scans {:.3f}s".format((end - start)))
+
+ total_rbds = sum([thread.num_rbds for thread in threads])
+ self.logger.debug("total rbds found : {}".format(total_rbds))
+
+ for thread in threads:
+ del thread
+
+ return total_rbds
+
+ def get_stats(self):
+ """
+ method associated with the plugin callback to gather the metrics
+ :return: (dict) metadata describing the state of the mon/osd's
+ """
+
+ start = time.time()
+
+ pool_stats = self._get_pool_stats()
+ num_osd_hosts, osd_states = self._get_osd_states()
+ cluster_state = self.get_mon_health()
+ cluster_state['num_osd_hosts'] = num_osd_hosts
+ cluster_state['num_rbds'] = self._get_rbds(cluster_state['mon_status'])
+
+ all_stats = common.merge_dicts(cluster_state, {"pools": pool_stats,
+ "osd_state": osd_states})
+
+ end = time.time()
+ self.logger.info("mon get_stats call : {:.3f}s".format((end - start)))
+
+ return {"mon": all_stats}
+
--- /dev/null
+#!/usr/bin/env python
+
+import os
+import time
+import math
+
+from cephmetrics_collectors import (base, common)
+
+__author__ = "Paul Cuzner"
+
+
+class OSDstats(object):
+
+ osd_capacity = {
+ "stat_bytes": ("stat_bytes", "gauge"),
+ "stat_bytes_used": ("stat_bytes_used", "gauge"),
+ "stat_bytes_avail": ("stat_bytes_avail", "gauge")
+ }
+
+ filestore_metrics = {
+ "journal_latency",
+ "commitcycle_latency",
+ "apply_latency",
+ "queue_transaction_latency_avg"
+ }
+
+ def __init__(self, osd_type='filestore'):
+ self._current = {}
+ self._previous = {}
+ self._osd_type = osd_type
+ self.osd_percent_used = 0
+
+ def update(self, stats):
+ """
+ update the objects attributes based on the dict
+ :param stats: (dict) containing filestore performance ('filestore')
+ and capacity info ('osd')
+ :return: None
+ """
+
+ if self._current:
+ self._previous = self._current
+ self._current = stats['filestore']
+ else:
+ self._current = stats['filestore']
+
+ for attr in OSDstats.filestore_metrics:
+
+ if self._previous:
+ d_sum = self._current[attr].get('sum') - \
+ self._previous[attr].get('sum')
+ d_avgcount = self._current[attr].get('avgcount') - \
+ self._previous[attr].get('avgcount')
+
+ if d_sum == 0 or d_avgcount == 0:
+ val = 0
+ else:
+ val = float(d_sum) / d_avgcount
+ else:
+ # no previous value, so set to 0
+ val = 0
+
+ setattr(self, attr, val)
+
+ for attr in stats['osd']:
+ setattr(self, attr, stats['osd'].get(attr))
+
+ self.osd_percent_used = math.ceil((float(self.stat_bytes_used) /
+ self.stat_bytes) * 100)
+
+
+class OSDs(base.BaseCollector):
+
+ all_metrics = common.merge_dicts(
+ common.Disk.metrics, common.IOstat.metrics)
+
+ def __init__(self, cluster_name, **kwargs):
+ base.BaseCollector.__init__(self, cluster_name, **kwargs)
+ self.timestamp = int(time.time())
+
+ self.osd = {} # dict of disk objects, each disk contains osd_id
+ self.jrnl = {} # dict of journal devices (if not collocated)
+ self.osd_id_list = []
+ self.dev_lookup = {} # dict dev_name -> osd | jrnl
+ self.osd_count = 0
+
+ def __repr__(self):
+
+ s = ''
+ for disk in self.osd:
+ s += "{}\n".format(disk)
+ dev = self.osd[disk]
+
+ for var in vars(dev):
+ if not var.startswith('_'):
+ s += "{} ... {}\n".format(var, getattr(dev, var))
+ return s
+
+ def _fetch_osd_stats(self, osd_id):
+
+ # NB: osd stats are cumulative
+
+ stats = {}
+ osd_socket_name = '/var/run/ceph/{}-osd.{}.asok'.format(
+ self.cluster_name, osd_id)
+
+ if not os.path.exists(osd_socket_name):
+ # all OSD's should expose an admin socket, so if it's missing
+ # this node has a problem!
+ raise IOError("Socket file missing for OSD {}".format(osd_id))
+
+ self.logger.debug("fetching osd stats for osd {}".format(osd_id))
+ resp = self._admin_socket(socket_path=osd_socket_name)
+
+ filestore_stats = resp.get('filestore')
+ stats['filestore'] = {key_name: filestore_stats.get(key_name)
+ for key_name in OSDstats.filestore_metrics}
+
+ osd_stats = resp.get('osd')
+
+ # Add disk usage stats
+ stats['osd'] = {key_name: osd_stats.get(key_name)
+ for key_name in OSDstats.osd_capacity.keys()}
+
+ return stats
+
+ @staticmethod
+ def get_osd_type(osd_path):
+
+ osd_type_fname = os.path.join(osd_path, 'type')
+ if os.path.exists(osd_type_fname):
+ return common.fread(osd_type_fname)
+ else:
+ if os.path.exists(os.path.join(osd_path, 'journal')):
+ return "filestore"
+ else:
+ raise ValueError("Unrecognised OSD type")
+
+ def _dev_to_osd(self):
+ """
+ Look at the system to determine which disks are acting as OSD's
+ """
+
+ # the logic here uses the mount points to determine which OSD's are
+ # in the system. The encryption state is determine just by the use
+ # devicemapper (i.e. /dev/mapper prefixed devices) - since at this time
+ # this is all dm is used for.
+
+ osd_indicators = {'var', 'lib', 'osd'}
+
+ for mnt in common.freadlines('/proc/mounts'):
+ items = mnt.split(' ')
+ dev_path, path_name = items[:2]
+ if path_name.startswith('/var/lib'):
+ # take a close look since this is where ceph osds usually
+ # get mounted
+ dirs = set(path_name.split('/'))
+ if dirs.issuperset(osd_indicators):
+
+ # get the osd_id from the name is the most simple way
+ # to get the id, due to naming conventions. If this fails
+ # though, plan 'b' is the whoami file
+ osd_id = path_name.split('-')[-1]
+ if not osd_id.isdigit():
+ osd_id = common.fread(
+ os.path.join(path_name, 'whoami'))
+
+ if osd_id not in self.osd:
+ osd_type = OSDs.get_osd_type(path_name)
+ self.osd[osd_id] = OSDstats(osd_type=osd_type)
+ self.osd_id_list.append(osd_id)
+
+ osd_type = self.osd[osd_id]._osd_type
+ if osd_type == 'filestore':
+ if dev_path.startswith('/dev/mapper'):
+ encrypted = 1
+ uuid = dev_path.split('/')[-1]
+ partuuid = '/dev/disk/by-partuuid/{}'.format(
+ uuid)
+ dev_path = os.path.realpath(partuuid)
+ osd_device = dev_path.split('/')[-1]
+ else:
+ encrypted = 0
+ osd_device = dev_path.split('/')[-1]
+
+ elif osd_type == 'bluestore':
+ block_link = os.path.join(path_name, 'block')
+ osd_path = os.path.realpath(block_link)
+ osd_device = osd_path.split('/')[-1]
+ encrypted = 0
+ else:
+ raise ValueError("Unknown OSD type encountered")
+
+ # if the osd_id hasn't been seem neither has the
+ # disk
+ self.osd[osd_device] = common.Disk(
+ osd_device,
+ path_name=path_name,
+ osd_id=osd_id,
+ in_osd_type=osd_type,
+ encrypted=encrypted)
+ self.dev_lookup[osd_device] = 'osd'
+ self.osd_count += 1
+
+ if osd_type == 'filestore':
+ journal_link = os.path.join(path_name, 'journal')
+ else:
+ journal_link = os.path.join(path_name, 'block.wal')
+
+ if os.path.exists(journal_link):
+ link_tgt = os.readlink(journal_link)
+ if link_tgt.startswith('/dev/mapper'):
+ encrypted = 1
+ else:
+ encrypted = 0
+
+ partuuid_path = os.path.join(
+ '/dev/disk/by-partuuid',
+ link_tgt.split('/')[-1])
+ jrnl_path = os.path.realpath(partuuid_path)
+ jrnl_dev = jrnl_path.split('/')[-1]
+
+ if jrnl_dev not in self.osd:
+ self.jrnl[jrnl_dev] = common.Disk(
+ jrnl_dev,
+ osd_id=osd_id,
+ in_osd_type=osd_type,
+ encrypted=encrypted)
+
+ self.dev_lookup[jrnl_dev] = 'jrnl'
+
+ else:
+ # No journal or WAL link..?
+ pass
+
+ def _stats_lookup(self):
+ """
+ Grab the disk stats from /proc/diskstats, and the key osd perf dump
+ counters
+ """
+
+ now = time.time()
+ interval = int(now) - self.timestamp
+ self.timestamp = int(now)
+
+ # Fetch diskstats from the OS
+ for perf_entry in common.freadlines('/proc/diskstats'):
+
+ field = perf_entry.split()
+ dev_name = field[2]
+
+ device = None
+ if self.dev_lookup.get(dev_name, None) == 'osd':
+ device = self.osd[dev_name]
+ elif self.dev_lookup.get(dev_name, None) == 'jrnl':
+ device = self.jrnl[dev_name]
+
+ if device:
+ new_stats = field[3:]
+
+ if device.perf._current:
+ device.perf._previous = device.perf._current
+ device.perf._current = new_stats
+ else:
+ device.perf._current = new_stats
+
+ device.perf.compute(interval)
+ device.refresh()
+
+ end = time.time()
+ self.logger.debug("OS disk stats calculated in "
+ "{:.4f}s".format(end-now))
+
+ # fetch stats from each osd daemon
+ osd_stats_start = time.time()
+ for osd_id in self.osd_id_list:
+
+ if self.osd[osd_id]._osd_type == 'filestore':
+ osd_stats = self._fetch_osd_stats(osd_id)
+
+ # self.logger.debug('stats : {}'.format(osd_stats))
+
+ osd_device = self.osd[osd_id]
+ osd_device.update(osd_stats)
+ else:
+ self.logger.debug("skipped 'bluestore' osd perf collection "
+ "for osd.{}".format(osd_id))
+
+ osd_stats_end = time.time()
+ self.logger.debug(
+ "OSD perf dump stats collected for {} OSDs in {:.3f}s".format(
+ len(self.osd_id_list), (osd_stats_end - osd_stats_start)))
+
+ @staticmethod
+ def _dump_devs(device_dict):
+
+ dumped = {}
+
+ for dev_name in sorted(device_dict):
+ device = device_dict[dev_name]
+ dumped[dev_name] = common.todict(device)
+
+ return dumped
+
+ def dump(self):
+ """
+ dump the osd object(s) to a dict. The object *must* not have references
+ to other objects - if this rule is broken cephmetrics caller will fail
+ when parsing the dict
+
+ :return: (dict) dictionary representation of this OSDs on this host
+ """
+
+ return {
+ "num_osds": self.osd_count,
+ "osd": OSDs._dump_devs(self.osd),
+ "jrnl": OSDs._dump_devs(self.jrnl)
+ }
+
+ def get_stats(self):
+
+ start = time.time()
+
+ self._dev_to_osd()
+ self._stats_lookup()
+
+ end = time.time()
+
+ self.logger.info("osd get_stats call "
+ ": {:.3f}s".format((end - start)))
+
+ return self.dump()
--- /dev/null
+#!/usr/bin/env python
+
+import time
+
+from cephmetrics_collectors import (base, common)
+
+__author__ = "paul.cuzner@redhat.com"
+
+
+class RGW(base.BaseCollector):
+
+ simple_metrics = {
+ "req": ("requests", "derive"),
+ "failed_req": ("requests_failed", "derive"),
+ "get": ("gets", "derive"),
+ "get_b": ("get_bytes", "derive"),
+ "put": ("puts", "derive"),
+ "put_b": ("put_bytes", "derive"),
+ "qlen": ("qlen", "derive"),
+ "qactive": ("requests_active", "derive")
+ }
+
+ int_latencies = [
+ "get_initial_lat",
+ "put_initial_lat"
+ ]
+
+ latencies = {
+ "get_initial_lat_sum": ("get_initial_lat_sum", "derive"),
+ "get_initial_lat_avgcount": ("get_initial_lat_avgcount", "derive"),
+ "put_initial_lat_sum": ("put_initial_lat_sum", "derive"),
+ "put_initial_lat_avgcount": ("put_initial_lat_avgcount", "derive")
+ }
+
+ all_metrics = common.merge_dicts(simple_metrics, latencies)
+
+ def __init__(self, cluster_name, admin_socket, **kwargs):
+ base.BaseCollector.__init__(self, cluster_name, admin_socket, **kwargs)
+ self.host_name = common.get_hostname()
+
+ def _get_rgw_data(self):
+
+ response = self._admin_socket()
+
+ key_name = 'client.rgw.{}'.format(self.host_name)
+
+ return response.get(key_name)
+
+ def _filter(self, stats):
+ # pick out the simple metrics
+
+ filtered = {key: stats[key] for key in RGW.simple_metrics}
+
+ for key in RGW.int_latencies:
+ for _attr in stats[key]:
+ new_key = "{}_{}".format(key, _attr)
+ filtered[new_key] = stats[key].get(_attr)
+
+ return filtered
+
+ def get_stats(self):
+
+ start = time.time()
+
+ raw_stats = self._get_rgw_data()
+
+ stats = self._filter(raw_stats)
+
+ end = time.time()
+
+ self.logger.info("RGW get_stats : {:.3f}s".format((end - start)))
+
+ return {"rgw": stats}