From: Zack Cerza Date: Thu, 10 Aug 2017 16:05:35 +0000 (-0700) Subject: Separate the collectd plugin from the collectors X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=24caf245217b30f076cfd71b89e91850e64822c9;p=cephmetrics.git Separate the collectd plugin from the collectors This will enable me to, in a later PR, properly package the collectors so that they can be placed in site-packages and used without collectd Signed-off-by: Zack Cerza --- diff --git a/ansible/roles/ceph-collectd/files/cephmetrics b/ansible/roles/ceph-collectd/files/cephmetrics deleted file mode 120000 index e2d0dbe..0000000 --- a/ansible/roles/ceph-collectd/files/cephmetrics +++ /dev/null @@ -1 +0,0 @@ -../../../../cephmetrics/ \ No newline at end of file diff --git a/ansible/roles/ceph-collectd/files/cephmetrics.py b/ansible/roles/ceph-collectd/files/cephmetrics.py new file mode 120000 index 0000000..8de2567 --- /dev/null +++ b/ansible/roles/ceph-collectd/files/cephmetrics.py @@ -0,0 +1 @@ +../../../../cephmetrics.py \ No newline at end of file diff --git a/ansible/roles/ceph-collectd/files/cephmetrics_collectors b/ansible/roles/ceph-collectd/files/cephmetrics_collectors new file mode 120000 index 0000000..7451887 --- /dev/null +++ b/ansible/roles/ceph-collectd/files/cephmetrics_collectors @@ -0,0 +1 @@ +../../../../cephmetrics_collectors \ No newline at end of file diff --git a/ansible/roles/ceph-collectd/tasks/configure_collectd.yml b/ansible/roles/ceph-collectd/tasks/configure_collectd.yml index d4df132..a967c86 100644 --- a/ansible/roles/ceph-collectd/tasks/configure_collectd.yml +++ b/ansible/roles/ceph-collectd/tasks/configure_collectd.yml @@ -51,5 +51,5 @@ replace: dest: "{{ collectd_conf_d }}/cephmetrics.conf" regexp: 'ModulePath ".*"' - replace: 'ModulePath "{{ collectd_cephmetrics_dir }}"' + replace: 'ModulePath "{{ collectd_dir }}"' notify: Restart collectd diff --git a/ansible/roles/ceph-collectd/tasks/install_collectd_plugins.yml b/ansible/roles/ceph-collectd/tasks/install_collectd_plugins.yml index 2deb625..ba0fff8 100644 --- a/ansible/roles/ceph-collectd/tasks/install_collectd_plugins.yml +++ b/ansible/roles/ceph-collectd/tasks/install_collectd_plugins.yml @@ -3,21 +3,26 @@ set_fact: collectd_dir: "/usr/lib{{ '64' if ansible_pkg_mgr == 'yum' else '' }}/collectd" -- name: Set collectd_cephmetrics_dir +- name: Set collectors_dir set_fact: - collectd_cephmetrics_dir: "{{ collectd_dir }}/cephmetrics" + collectors_dir: "{{ collectd_dir }}/cephmetrics_collectors" - name: Remove stale Python files file: dest: "{{ item }}" state: absent with_items: - - "{{ collectd_cephmetrics_dir }}/cephmetrics.py" - - "{{ collectd_cephmetrics_dir }}/cephmetrics.pyc" - - "{{ collectd_cephmetrics_dir }}/collectors" + - "{{ collectd_dir }}/cephmetrics" + - "{{ collectors_dir }}/collectors" -- name: Ship collector plugins +- name: Ship collectors copy: - src: files/cephmetrics - dest: "{{ collectd_cephmetrics_dir }}" + src: files/cephmetrics_collectors/ + dest: "{{ collectors_dir }}" + notify: Restart collectd + +- name: Ship collectd plugin + copy: + src: files/cephmetrics.py + dest: "{{ collectd_dir }}" notify: Restart collectd diff --git a/cephmetrics.py b/cephmetrics.py new file mode 100644 index 0000000..afb2574 --- /dev/null +++ b/cephmetrics.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +import logging +import os +import sys + +TEST_MODE = (sys.argv[0].split('/')[-1] == 'py.test') +try: + import collectd +except ImportError: + if not TEST_MODE: + raise + +from cephmetrics_collectors import (Ceph, common, iscsi, mon, osd, rgw) + +__author__ = 'Paul Cuzner' + +PLUGIN_NAME = 'cephmetrics' + + +def write_stats(role_metrics, stats): + + flat_stats = common.flatten_dict(stats, '.') + for key_name in flat_stats: + attr_name = key_name.split('.')[-1] + + # TODO: this needs some more think time, since the key from the name + # is not the key of the all_metrics dict + if attr_name in role_metrics: + attr_type = role_metrics[attr_name][1] # gauge / derive etc + else: + # assign a default + attr_type = 'gauge' + + attr_value = flat_stats[key_name] + + val = collectd.Values(plugin=PLUGIN_NAME, type=attr_type) + instance_name = "{}.{}".format(CEPH.cluster_name, + key_name) + val.type_instance = instance_name + val.values = [attr_value] + val.dispatch() + + +def configure_callback(conf): + + valid_log_levels = ['debug', 'info'] + + global CEPH + module_parms = {node.key: node.values[0] for node in conf.children} + + log_level = module_parms.get('LogLevel', 'debug') + if log_level not in valid_log_levels: + collectd.error("LogLevel specified is invalid - must" + " be :{}".format(' or '.join(valid_log_levels))) + + if 'ClusterName' in module_parms: + cluster_name = module_parms['ClusterName'] + # cluster name is all we need to get started + if not os.path.exists('/etc/ceph/{}.conf'.format(cluster_name)): + collectd.error("Clustername given ('{}') not found in " + "/etc/ceph".format(module_parms['ClusterName'])) + + # let's assume the conf file is OK to use + CEPH.cluster_name = cluster_name + + setup_module_logging(log_level) + + CEPH.probe() + collectd.info( + "{}: Roles detected - mon:{} osd:{} rgw:{} iscsi:{}".format( + __name__, + isinstance(CEPH.mon, mon.Mon), + isinstance(CEPH.osd, osd.OSDs), + isinstance(CEPH.rgw, rgw.RGW), + isinstance(CEPH.iscsi, iscsi.ISCSIGateway), + )) + + else: + collectd.error("ClusterName is required") + + +def setup_module_logging(log_level, path='/var/log/collectd-cephmetrics.log'): + + level = {"debug": logging.DEBUG, + "info": logging.INFO} + + logging.getLogger('cephmetrics') + log_conf = dict( + format='%(asctime)s - %(levelname)-7s - ' + '[%(filename)s:%(lineno)s:%(funcName)s() - ' + '%(message)s', + level=level.get(log_level) + ) + if path: + log_conf['filename'] = path + logging.basicConfig(**log_conf) + + +def read_callback(): + + if CEPH.mon: + mon_stats = CEPH.mon.get_stats() + write_stats(mon.Mon.all_metrics, mon_stats) + + if CEPH.rgw: + rgw_stats = CEPH.rgw.get_stats() + write_stats(rgw.RGW.all_metrics, rgw_stats) + + if CEPH.osd: + osd_node_stats = CEPH.osd.get_stats() + write_stats(osd.OSDs.all_metrics, osd_node_stats) + + if CEPH.iscsi: + iscsi_stats = CEPH.iscsi.get_stats() + write_stats(iscsi.ISCSIGateway.metrics, iscsi_stats) + + +if TEST_MODE: + pass +else: + CEPH = Ceph() + collectd.register_config(configure_callback) + collectd.register_read(read_callback) diff --git a/cephmetrics/__init__.py b/cephmetrics/__init__.py deleted file mode 100644 index 4f230d3..0000000 --- a/cephmetrics/__init__.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python -import sys -TEST_MODE = (sys.argv[0].split('/')[-1] == 'py.test') - -import os -import glob -import logging - -try: - import collectd -except ImportError: - if not TEST_MODE: - raise - -from collectors import (common, iscsi, mon, osd, rgw) - -__author__ = 'Paul Cuzner' - -PLUGIN_NAME = 'cephmetrics' - - -class Ceph(object): - def __init__(self): - self.cluster_name = None - self.host_name = common.get_hostname() - - self.mon_socket = None - self.rgw_socket = None - - self.mon = None - self.rgw = None - self.osd = None - self.iscsi = None - - def probe(self): - """ - set up which collector(s) to use, based on what types of sockets we - find in /var/run/ceph - """ - - mon_socket = '/var/run/ceph/{}-mon.{}.asok'.format(self.cluster_name, - self.host_name) - if os.path.exists(mon_socket): - self.mon_socket = mon_socket - self.mon = mon.Mon(self.cluster_name, - admin_socket=mon_socket) - - rgw_socket_list = glob.glob('/var/run/ceph/{}-client.rgw.*.' - 'asok'.format(self.cluster_name)) - - if rgw_socket_list: - rgw_socket = rgw_socket_list[0] - self.rgw = rgw.RGW(self.cluster_name, - admin_socket=rgw_socket) - - osd_socket_list = glob.glob('/var/run/ceph/{}-osd.*' - '.asok'.format(self.cluster_name)) - mounted = common.freadlines('/proc/mounts') - osds_mounted = [mnt for mnt in mounted - if mnt.split()[1].startswith('/var/lib/ceph')] - if osd_socket_list or osds_mounted: - self.osd = osd.OSDs(self.cluster_name) - - if os.path.exists('/sys/kernel/config/target/iscsi'): - self.iscsi = iscsi.ISCSIGateway(self.cluster_name) - - collectd.info( - "{}: Roles detected - mon:{} osd:{} rgw:{} iscsi:{}".format( - __name__, - isinstance(self.mon, mon.Mon), - isinstance(self.osd, osd.OSDs), - isinstance(self.rgw, rgw.RGW), - isinstance(self.iscsi, iscsi.ISCSIGateway), - )) - - -def write_stats(role_metrics, stats): - - flat_stats = common.flatten_dict(stats, '.') - for key_name in flat_stats: - attr_name = key_name.split('.')[-1] - - # TODO: this needs some more think time, since the key from the name - # is not the key of the all_metrics dict - if attr_name in role_metrics: - attr_type = role_metrics[attr_name][1] # gauge / derive etc - else: - # assign a default - attr_type = 'gauge' - - attr_value = flat_stats[key_name] - - val = collectd.Values(plugin=PLUGIN_NAME, type=attr_type) - instance_name = "{}.{}".format(CEPH.cluster_name, - key_name) - val.type_instance = instance_name - val.values = [attr_value] - val.dispatch() - - -def configure_callback(conf): - - valid_log_levels = ['debug', 'info'] - - global CEPH - module_parms = {node.key: node.values[0] for node in conf.children} - - log_level = module_parms.get('LogLevel', 'debug') - if log_level not in valid_log_levels: - collectd.error("LogLevel specified is invalid - must" - " be :{}".format(' or '.join(valid_log_levels))) - - if 'ClusterName' in module_parms: - cluster_name = module_parms['ClusterName'] - # cluster name is all we need to get started - if not os.path.exists('/etc/ceph/{}.conf'.format(cluster_name)): - collectd.error("Clustername given ('{}') not found in " - "/etc/ceph".format(module_parms['ClusterName'])) - - # let's assume the conf file is OK to use - CEPH.cluster_name = cluster_name - - setup_module_logging(log_level) - - CEPH.probe() - - else: - collectd.error("ClusterName is required") - - -def setup_module_logging(log_level, path='/var/log/collectd-cephmetrics.log'): - - level = {"debug": logging.DEBUG, - "info": logging.INFO} - - logging.getLogger('cephmetrics') - log_conf = dict( - format='%(asctime)s - %(levelname)-7s - ' - '[%(filename)s:%(lineno)s:%(funcName)s() - ' - '%(message)s', - level=level.get(log_level) - ) - if path: - log_conf['filename'] = path - logging.basicConfig(**log_conf) - - -def read_callback(): - - if CEPH.mon: - mon_stats = CEPH.mon.get_stats() - write_stats(mon.Mon.all_metrics, mon_stats) - - if CEPH.rgw: - rgw_stats = CEPH.rgw.get_stats() - write_stats(rgw.RGW.all_metrics, rgw_stats) - - if CEPH.osd: - osd_node_stats = CEPH.osd.get_stats() - write_stats(osd.OSDs.all_metrics, osd_node_stats) - - if CEPH.iscsi: - iscsi_stats = CEPH.iscsi.get_stats() - write_stats(iscsi.ISCSIGateway.metrics, iscsi_stats) - - -if TEST_MODE: - pass -else: - CEPH = Ceph() - collectd.register_config(configure_callback) - collectd.register_read(read_callback) diff --git a/cephmetrics/collectors/__init__.py b/cephmetrics/collectors/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/cephmetrics/collectors/base.py b/cephmetrics/collectors/base.py deleted file mode 100644 index f2f2295..0000000 --- a/cephmetrics/collectors/base.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env python - -import json -import time -import logging - -from ceph_daemon import admin_socket - - -class BaseCollector(object): - - def __init__(self, cluster_name, admin_socket=None): - self.cluster_name = cluster_name - self.admin_socket = admin_socket - - self.logger = logging.getLogger('cephmetrics') - - def _admin_socket(self, cmds=None, socket_path=None): - - adm_socket = self.admin_socket if not socket_path else socket_path - - if not cmds: - cmds = ['perf', 'dump'] - - start = time.time() - response = admin_socket(adm_socket, cmds, - format='json') - end = time.time() - - self.logger.debug("admin_socket call '{}' : " - "{:.3f}s".format(' '.join(cmds), - (end - start))) - - return json.loads(response) - - def get_stats(self): - - return {} diff --git a/cephmetrics/collectors/common.py b/cephmetrics/collectors/common.py deleted file mode 100644 index 8b94dcc..0000000 --- a/cephmetrics/collectors/common.py +++ /dev/null @@ -1,247 +0,0 @@ -#!/usr/bin/env python - - -import socket -from os import statvfs -import math - - -def get_hostname(): - return socket.gethostname().split('.')[0] - - -def add_dicts(dict1, dict2): - """ - Add dictionary values together - :param dict1: - :param dict2: - :return: dict with matching fields sum'd together - """ - return {key: dict1.get(key, 0) + dict2.get(key, 0) - for key in set(dict1).union(dict2)} - - -def merge_dicts(dict1, dict2): - """ - merges two dicts together to form a single dict. when dict keys overlap - the value in the 2nd dict takes precedence - :param dict1: - :param dict2: - :return: combined dict - """ - - new = dict1.copy() - new.update(dict2) - - return new - - -def flatten_dict(data, separator='.', prefix=''): - """ - flatten a dict, so it is just simple key/value pairs - :param data: (dict) - :param separator: (str) char to use when combining keys - :param prefix: key prefix - :return: - """ - return {prefix + separator + k if prefix else k: v - for kk, vv in data.items() - for k, v in flatten_dict(vv, separator, kk).items() - } if isinstance(data, dict) else {prefix: data} - - -def todict(obj): - """ - convert an object to a dict representation - :param obj: (object) object to examine, to extract variables/values from - :return: (dict) representation of the given object - """ - data = {} - for key, value in obj.__dict__.iteritems(): - - if key.startswith('_'): - continue - - try: - data[key] = todict(value) - except AttributeError: - data[key] = value - - return data - - -def fread(file_name=None): - """ - Simple read function for files of a single value - :param file_name: (str) file name to read - :return: (str) contents of the file - """ - - with open(file_name, 'r') as f: - setting = f.read().rstrip() - return setting - - -def freadlines(file_name=None): - """ - simple readlines function to return all records of a given file - :param file_name: (str) file name to read - :return: (list) contents of the file - """ - - with open(file_name, 'r') as f: - data = f.readlines() - return data - - -class IOstat(object): - raw_metrics = [ - "_reads", - "_reads_mrgd", - "_sectors_read", - "_read_ms", - "_writes", - "_writes_mrgd", - "_sectors_written", - "_write_ms", - "_current_io", - "_ms_active_io", - "_ms_active_io_w" - ] - - sector_size = 512 - - metrics = { - "iops": ("iops", "gauge"), - "r_iops": ("r_iops", "gauge"), - "w_iops": ("w_iops", "gauge"), - "bytes_per_sec": ("bytes_per_sec", "gauge"), - "r_bytes_per_sec": ("r_bytes_per_sec", "gauge"), - "w_bytes_per_sec": ("w_bytes_per_sec", "gauge"), - "util": ("util", "gauge"), - "await": ("await", "gauge"), - "r_await": ("r_await", "gauge"), - "w_await": ("w_await", "gauge"), - } - - def __init__(self): - self._previous = [] - self._current = [] - - # Seed the metrics we're interested in - for ctr in IOstat.metrics.keys(): - setattr(self, ctr, 0) - - def __str__(self): - s = '\n- IOstat object:\n' - for key in sorted(vars(self)): - s += '\t{} ... {}\n'.format(key, getattr(self, key)) - return s - - def _calc_raw_delta(self): - if not self._previous: - # nothing to compute yet - for ptr in range(len(IOstat.raw_metrics)): - key = IOstat.raw_metrics[ptr] - setattr(self, key, 0) - else: - for ptr in range(len(IOstat.raw_metrics)): - key = IOstat.raw_metrics[ptr] - setattr(self, key, (int(self._current[ptr]) - - int(self._previous[ptr]))) - - def compute(self, sample_interval): - """ - Calculate the iostats for this device - """ - - self._calc_raw_delta() - - if sample_interval > 0: - interval_ms = sample_interval * 1000 - total_io = self._reads + self._writes - self.util = float(self._ms_active_io) / interval_ms * 100 - self.iops = int(total_io) / sample_interval - self.r_iops = int(self._reads) / sample_interval - self.w_iops = int(self._writes) / sample_interval - self.await = (float(self._write_ms + self._read_ms) / total_io if - total_io > 0 else 0) - self.w_await = float( - self._write_ms) / self._writes if self._writes > 0 else 0 - self.r_await = float( - self._read_ms) / self._reads if self._reads > 0 else 0 - self.r_bytes_per_sec = (float( - self._sectors_read * IOstat.sector_size)) / sample_interval - self.w_bytes_per_sec = (float( - self._sectors_written * IOstat.sector_size)) / sample_interval - self.bytes_per_sec = self.r_bytes_per_sec + self.w_bytes_per_sec - - -class Disk(object): - - metrics = { - "rotational": ("rotational", "gauge"), - "disk_size": ("disk_size", "gauge"), - "fs_size": ("fs_size", "gauge"), - "fs_used": ("fs_used", "gauge"), - "fs_percent_used": ("fs_percent_used", "gauge"), - "osd_id": ("osd_id", "gauge") - } - - osd_types = {"filestore": 0, "bluestore": 1} - - def __init__(self, device_name, path_name=None, osd_id=None, - in_osd_type="filestore", encrypted=0): - - self._name = device_name - self._path_name = path_name - self._base_dev = Disk.get_base_dev(device_name) - self.osd_id = osd_id - - self.rotational = self._get_rota() - self.disk_size = self._get_size() - self.perf = IOstat() - self.fs_size = 0 - self.fs_percent_used = 0 - self.fs_used = 0 - self.encrypted = encrypted - self.osd_type = Disk.osd_types[in_osd_type] - - self.refresh() - - def _get_size(self): - return int(fread("/sys/block/{}/size".format(self._base_dev))) * 512 - - def _get_rota(self): - return int( - fread("/sys/block/{}/queue/rotational".format(self._base_dev))) - - def _get_fssize(self): - s = statvfs("{}/whoami".format(self._path_name)) - fs_size = s.f_blocks * s.f_bsize - fs_used = fs_size - (s.f_bfree * s.f_bsize) - fs_percent_used = math.ceil((float(fs_used) / fs_size) * 100) - return fs_size, fs_used, fs_percent_used - - def refresh(self): - # only run the fs size update, if the _path_name is set. - if self._path_name: - self.fs_size, self.fs_used, self.fs_percent_used = \ - self._get_fssize() - - @staticmethod - def get_base_dev(dev_name): - - # for intelcas devices, just use the device name as is - if dev_name.startswith('intelcas'): - device = dev_name - elif dev_name.startswith('nvme'): - if 'p' in dev_name: - device = dev_name[:(dev_name.index('p'))] - else: - device = dev_name - else: - # default strip any numeric ie. sdaa1 -> sdaa - device = filter(lambda ch: ch.isalpha(), dev_name) - - return device diff --git a/cephmetrics/collectors/iscsi.py b/cephmetrics/collectors/iscsi.py deleted file mode 100644 index 27dbceb..0000000 --- a/cephmetrics/collectors/iscsi.py +++ /dev/null @@ -1,255 +0,0 @@ -#!/usr/bin/env python2 - -# requires python-rtslib_fb for LIO interaction -# -# NB. the rtslib_fb module is dynamically loaded by the ISCSIGateway -# class instantiation. This prevents import errors within the generic parent -# module cephmetrics -# -import os -import sys -import time - -from cephmetrics.collectors import (base, common) - - -class Client(object): - - def __init__(self, iqn): - self.iqn = iqn - self.name = iqn.replace('.', '-') - self.luns = {} - self.lun_count = 0 - self._cycle = 0 - - def dump(self): - client_dump = {} - lun_info = {} - client_dump[self.name] = {"luns": {}, - "lun_count": self.lun_count} - for lun_name in self.luns: - lun = self.luns[lun_name] - lun_info.update(lun.dump()) - - return {self.name: {"luns": lun_info, - "lun_count": len(lun_info)} - } - - -class LUN(object): - - def __init__(self, client, tpg_lun): - self._path = tpg_lun.storage_object.path - self._tpg_lun = tpg_lun - self._name = tpg_lun.storage_object.name - self._display_name = tpg_lun.storage_object.name.replace('.', "-") - self._so = tpg_lun.storage_object - self._client = client - self._cycle = 0 - self.size = 0 - self.iops = 0 - self.read_bytes_per_sec = 0 - self.write_bytes_per_sec = 0 - self.total_bytes_per_sec = 0 - self.active_path = 0 - - def refresh(self, cycle_id): - self._cycle = cycle_id - self.size = self._so.size - stats_path = os.path.join(self._path, 'statistics/scsi_lu') - self.iops = int(common.fread(os.path.join(stats_path, "num_cmds"))) - read_mb = float(common.fread(os.path.join(stats_path, "read_mbytes"))) - write_mb = float( - common.fread(os.path.join(stats_path, "write_mbytes"))) - self.read_bytes_per_sec = int(read_mb * 1024 ** 2) - self.write_bytes_per_sec = int(write_mb * 1024 ** 2) - self.total_bytes_per_sec = self.read_bytes_per_sec + \ - self.write_bytes_per_sec - - if self._tpg_lun.alua_tg_pt_gp_name == 'ao': - self.active_path = 1 - else: - self.active_path = 0 - - def dump(self): - return {self._display_name: {k: getattr(self, k) for k in self.__dict__ - if not k.startswith("_")}} - - -class ISCSIGateway(base.BaseCollector): - """ - created on a host that has a /sys/kernel/config/target/iscsi dir - i.e. there is an iscsi gateway here! - """ - - metrics = { - "lun_count": ("lun_count", "gauge"), - "client_count": ("client_count", "gauge"), - "tpg_count": ("tpg_count", "gauge"), - "sessions": ("sessions", "gauge"), - "capacity": ("capacity", "gauge"), - "iops": ("iops", "derive"), - "read_bytes_per_sec": ("read_bytes_per_sec", "derive"), - "write_bytes_per_sec": ("write_bytes_per_sec", "derive"), - "total_bytes_per_sec": ("total_bytes_per_sec", "derive") - } - - def __init__(self, *args, **kwargs): - base.BaseCollector.__init__(self, *args, **kwargs) - - # Since the module can be imported by a parent class but not - # instantiated, the rtslib import is deferred until the first instance - # of the the class is created. This keeps the parent module simple - # and more importantly generic - if 'rtslib_fb.root' not in sys.modules.keys(): - - try: - import rtslib_fb.root as RTSRoot - except ImportError: - raise ImportError("rtslib_fb python package is missing") - - self._root = RTSRoot() - - self.clients = {} - self.cycle = 0 - - self.iops = 0 - self.read_bytes_per_sec = 0 - self.write_bytes_per_sec = 0 - self.total_bytes_per_sec = 0 - - def refresh(self): - """ - populate the instance by exploring rtslib - """ - - self.iops = 0 - self.read_bytes_per_sec = 0 - self.write_bytes_per_sec = 0 - self.total_bytes_per_sec = 0 - - if self.cycle == 10: - self.cycle = 0 - else: - self.cycle += 1 - - for node_acl in self._root.node_acls: - - client_name = node_acl.node_wwn - - if client_name not in self.clients: - new_client = Client(client_name) - self.clients[client_name] = new_client - - client = self.clients[client_name] - client.lun_count = 0 - client._cycle = self.cycle - - for lun in node_acl.mapped_luns: - client.lun_count += 1 - tpg_lun = lun.tpg_lun - lun_name = tpg_lun.storage_object.name - if lun_name not in client.luns: - lun = LUN(client, tpg_lun) - client.luns[lun._name] = lun - else: - lun = client.luns[lun_name] - - lun.refresh(self.cycle) - - self.iops += lun.iops - self.read_bytes_per_sec += lun.read_bytes_per_sec - self.write_bytes_per_sec += lun.write_bytes_per_sec - self.total_bytes_per_sec = self.read_bytes_per_sec + \ - self.write_bytes_per_sec - - def prune(self): - """ - drop child objects held by the instance, that are no longer in the - iSCSI config i.e. don't report on old information - """ - - for client_name in self.clients: - client = self.clients[client_name] - - for lun_name in client.luns: - lun = client.luns[lun_name] - if lun._cycle != self.cycle: - # drop the lun entry - self.logger.debug("pruning LUN '{}'".format(lun_name)) - - del client.luns[lun_name] - - if client._cycle != self.cycle: - # drop the client entry - self.logger.debug("pruning client '{}'".format(client_name)) - del self.clients[client_name] - - def dump(self): - - gw_stats = {} - client_stats = {} - - for metric in ISCSIGateway.metrics: - gw_stats[metric] = getattr(self, metric) - - for client_name in self.clients: - client = self.clients[client_name] - client_stats.update(client.dump()) - - return {"iscsi": { - "gw_name": {self.gateway_name: 0}, - "gw_stats": gw_stats, - "gw_clients": client_stats - } - } - - def _get_so(self): - return [so for so in self._root.storage_objects] - - def _get_node_acls(self): - return [node for node in self._root.node_acls] - - @property - def tpg_count(self): - return len([tpg for tpg in self._root.tpgs]) - - @property - def lun_count(self): - return len(self._get_so()) - - @property - def sessions(self): - return len([session for session in self._root.sessions]) - - @property - def gateway_name(self): - # Only the 1st gateway is considered/supported - gw_iqn = [gw.wwn for gw in self._root.targets][0] - return gw_iqn.replace('.', '-') - - @property - def client_count(self): - return len(self._get_node_acls()) - - @property - def capacity(self): - return sum([so.size for so in self._get_so()]) - - def get_stats(self): - - start = time.time() - - # populate gateway instance with the latest configuration from rtslib - self.refresh() - - # Overtime they'll be churn in client and disks so we need to drop - # any entries from prior runs that are no longer seen in the iscsi - # configuration with the prune method - self.prune() - - end = time.time() - - self.logger.info("LIO stats took {}s".format(end - start)) - - return self.dump() diff --git a/cephmetrics/collectors/mon.py b/cephmetrics/collectors/mon.py deleted file mode 100644 index 411af88..0000000 --- a/cephmetrics/collectors/mon.py +++ /dev/null @@ -1,431 +0,0 @@ -#!/usr/bin/env python - -import rados -import rbd -import json -import threading -import time -import logging - -from cephmetrics.collectors import (base, common) - - -class RBDScanner(threading.Thread): - - def __init__(self, cluster_name, pool_name): - self.cluster_name = cluster_name - self.pool_name = pool_name - self.num_rbds = 0 - self.logger = logging.getLogger('cephmetrics') - - threading.Thread.__init__(self) - - def run(self): - rbd_images = [] - conf_file = "/etc/ceph/{}.conf".format(self.cluster_name) - self.logger.debug("scan of '{}' starting".format(self.pool_name)) - with rados.Rados(conffile=conf_file) as cluster: - with cluster.open_ioctx(self.pool_name) as ioctx: - rbd_inst = rbd.RBD() - self.logger.debug("listing rbd's in {}".format(self.pool_name)) - rbd_images = rbd_inst.list(ioctx) - - self.logger.info("pool scan complete for '{}'".format(self.pool_name)) - self.num_rbds = len(rbd_images) - - -class Mon(base.BaseCollector): - - health = { - "HEALTH_OK": 0, - "HEALTH_WARN": 4, - "HEALTH_ERR": 8 - } - - osd_state = { - "up": 0, - "down": 1 - } - - # metrics are declared, where each element has a description and collectd - # data type. The description is used to ensure the names sent by collectd - # remain the same even if the source name changes in ceph. - cluster_metrics = { - "num_mon": ("num_mon", "gauge"), - "num_mon_quorum": ("num_mon_quorum", "gauge"), - "num_rbds": ("num_rbds", "gauge"), - "num_osd_hosts": ("num_osd_hosts", "gauge"), - "num_osd": ("num_osd", "gauge"), - "num_osd_up": ("num_osd_up", "gauge"), - "num_osd_in": ("num_osd_in", "gauge"), - "osd_epoch": ("osd_epoch", "gauge"), - "osd_bytes": ("osd_bytes", "gauge"), - "osd_bytes_used": ("osd_bytes_used", "gauge"), - "osd_bytes_avail": ("osd_bytes_avail", "gauge"), - "num_pool": ("num_pool", "gauge"), - "num_pg": ("num_pg", "gauge"), - "num_pg_active_clean": ("num_pg_active_clean", "gauge"), - "num_pg_active": ("num_pg_active", "gauge"), - "num_pg_peering": ("num_pg_peering", "gauge"), - "num_object": ("num_object", "gauge"), - "num_object_degraded": ("num_object_degraded", "gauge"), - "num_object_misplaced": ("num_object_misplaced", "gauge"), - "num_object_unfound": ("num_object_unfound", "gauge"), - "num_bytes": ("num_bytes", "gauge"), - "num_mds_up": ("num_mds_up", "gauge"), - "num_mds_in": ("num_mds_in", "gauge"), - "num_mds_failed": ("num_mds_failed", "gauge"), - "mds_epoch": ("mds_epoch", "gauge"), - "health": ("health", "gauge") - } - - pool_client_metrics = { - 'bytes_sec': ("bytes_sec", "gauge"), - 'op_per_sec': ("op_per_sec", "gauge"), - 'read_bytes_sec': ("read_bytes_sec", "gauge"), - 'write_op_per_sec': ("write_op_per_sec", "gauge"), - 'write_bytes_sec': ("write_bytes_sec", "gauge"), - 'read_op_per_sec': ("read_op_per_sec", "gauge") - } - - pool_recovery_metrics = { - "recovering_objects_per_sec": ("recovering_objects_per_sec", "gauge"), - "recovering_bytes_per_sec": ("recovering_bytes_per_sec", "gauge"), - "recovering_keys_per_sec": ("recovering_keys_per_sec", "gauge"), - "num_objects_recovered": ("num_objects_recovered", "gauge"), - "num_bytes_recovered": ("num_bytes_recovered", "gauge"), - "num_keys_recovered": ("num_keys_recovered", "gauge") - } - - osd_metrics = { - "status": ("status", "gauge") - } - - mon_states = { - "mon_status": ("mon_status", "gauge") - } - - all_metrics = common.merge_dicts( - pool_recovery_metrics, pool_client_metrics) - all_metrics = common.merge_dicts(all_metrics, cluster_metrics) - all_metrics = common.merge_dicts(all_metrics, osd_metrics) - all_metrics = common.merge_dicts(all_metrics, mon_states) - - def __init__(self, *args, **kwargs): - base.BaseCollector.__init__(self, *args, **kwargs) - self.version = self._get_version() - if self.version < 12: - self.get_mon_health = self._mon_health - else: - self.get_mon_health = self._mon_health_new - - def _get_version(self): - vers_info = self._mon_command('version') - return int(vers_info['version'].replace('.', ' ').split()[2]) - - def _mon_command(self, cmd_request): - """ Issue a command to the monitor """ - - buf_s = '{}' - conf_file = "/etc/ceph/{}.conf".format(self.cluster_name) - - start = time.time() - with rados.Rados(conffile=conf_file) as cluster: - cmd = {'prefix': cmd_request, 'format': 'json'} - rc, buf_s, out = cluster.mon_command(json.dumps(cmd), b'') - end = time.time() - - self.logger.debug("_mon_command call '{}' :" - " {:.3f}s".format(cmd_request, - (end - start))) - - return json.loads(buf_s) - - @staticmethod - def get_feature_state(summary_data, pg_states): - """ - Look at the summary list to determine the state of RADOS features - :param summary_data: (list) summary data from a ceph health command - :return: (dict) dict indexed by feature - 0 Inactive, 1 Active, 2 Disabled - """ - feature_lookup = {"noscrub": "scrub", - "nodeep-scrub": "deep_scrub", - "norecover": "recovery", - "nobackfill": "backfill", - "norebalance": "rebalance", - "noout": "out", - "nodown": "down"} - - # Start with all features inactive i.e. enabled - feature_state = {feature_lookup.get(key): 0 for key in feature_lookup} - - for summary in summary_data: - summary_desc = summary.get('summary') - if "flag(s) set" in summary_desc: - flags = summary_desc.replace(' flag(s) set', '').split(',') - for disabled_feature in flags: - if disabled_feature in feature_lookup: - feature = feature_lookup.get(disabled_feature) - feature_state[feature] = 2 # feature disabled - - # Now use the current pg state names to determine whether a feature is - # active - if not it stays set to '0', which means inactive - pg_state_names = [pg_state.get('name') for pg_state in pg_states] - for pg_state in pg_state_names: - states = pg_state.split('+') - if 'recovering' in states: - feature_state['recovery'] = 1 # Active - continue - if 'backfilling' in states: - feature_state['backfill'] = 1 - continue - if 'deep' in states: - feature_state['deep_scrub'] = 1 - continue - if 'scrubbing' in states: - feature_state['scrub'] = 1 - - return feature_state - - @classmethod - def check_stuck_pgs(cls, summary_list): - bad_pg_words = ['pgs', 'stuck', 'inactive'] - stuck_pgs = 0 - for summary_data in summary_list: - if summary_data.get('severity') != 'HEALTH_ERR': - continue - if all(trigger in summary_data.get('summary') - for trigger in bad_pg_words): - stuck_pgs = int(summary_data.get('summary').split()[0]) - - return stuck_pgs - - def _mon_health_new(self): - - cluster, health_data = self._mon_health_common() - - mon_status_output = self._mon_command('mon_status') - quorum_list = mon_status_output.get('quorum') - mon_list = mon_status_output.get('monmap').get('mons') - mon_status = {} - for mon in mon_list: - state = 0 if mon.get('rank') in quorum_list else 4 - mon_status[mon.get('name')] = state - - cluster['mon_status'] = mon_status - - return cluster - - def _mon_health_common(self): - - # for v12 (Luminous and beyond) add the following setting to - # ceph.conf "mon_health_preluminous_compat=true" - # this will provide the same output as pre-luminous - - cluster_data = self._admin_socket().get('cluster') - pg_data = self._mon_command("pg stat") - health_data = self._mon_command("health") - health_text = health_data.get('overall_status', - health_data.get('status', '')) - - cluster = {Mon.cluster_metrics[k][0]: cluster_data[k] - for k in cluster_data} - - health_num = Mon.health.get(health_text, 16) - - cluster['health'] = health_num - - pg_states = pg_data.get('num_pg_by_state') # list of dict name,num - health_summary = health_data.get('summary', []) # list of issues - cluster['num_pgs_stuck'] = Mon.check_stuck_pgs(health_summary) - cluster['features'] = Mon.get_feature_state(health_summary, - pg_states) - - self.logger.debug( - 'Features:{}'.format(json.dumps(cluster['features']))) - - return cluster, health_data - - def _mon_health(self): - - cluster, health_data = self._mon_health_common() - - services = health_data.get('health').get('health_services') - monstats = {} - for svc in services: - if 'mons' in svc: - # Each monitor will have a numeric value denoting health - monstats = { mon.get('name'): Mon.health.get(mon.get('health')) - for mon in svc.get('mons')} - - cluster['mon_status'] = monstats - - return cluster - - @classmethod - def _seed(cls, metrics): - return {metrics[key][0]: 0 for key in metrics} - - def display_names(self, metric_format, metrics): - """ - convert the keys to the static descriptions - :return: - """ - return {metric_format[k][0]: metrics[k] - for k in metrics} if metrics else {} - - def _get_pool_stats(self): - """ get pool stats from rados """ - - raw_stats = self._mon_command('osd pool stats') - pool_stats = {} - - # process each pool - for pool in raw_stats: - - pool_name = pool['pool_name'].replace('.', '_') - client_io = self.display_names(Mon.pool_client_metrics, - pool.get('client_io_rate')) - recovery = self.display_names(Mon.pool_recovery_metrics, - pool.get('recovery_rate')) - - pool_md = {} - if client_io: - - # Add pool level aggregation - client_io['bytes_sec'] = client_io.get('read_bytes_sec', 0) + \ - client_io.get('write_bytes_sec', 0) - client_io["op_per_sec"] = client_io.get('read_op_per_sec', 0)+ \ - client_io.get('write_op_per_sec', 0) - pool_md = client_io - - else: - pool_md = Mon._seed(Mon.pool_client_metrics) - - if recovery: - pool_md = common.merge_dicts(pool_md, recovery) - else: - pool_md = common.merge_dicts(pool_md, Mon._seed( - Mon.pool_recovery_metrics)) - - pool_stats[pool_name] = pool_md - - return pool_stats - - def _get_osd_states(self): - - self.logger.debug("fetching osd states from the local mon") - raw = self._mon_command('osd dump') - osd_hosts = set() - osds = {} - for osd in raw.get('osds'): - cluster_addr = osd.get('cluster_addr').split(':')[0] - osd_hosts.add(cluster_addr) - - # NB. The key for the osds dict must be a string as the dict is - # flattened when the metric name is derived in the parent collectd - # module. If it is not converted, you get a TypeError - osds[str(osd.get('osd'))] = {"up": osd.get('up'), - "in": osd.get('in')} - - return len(osd_hosts), osds - - @staticmethod - def _select_pools(pools, mons): - """ - determine the pools this mon should scan based on it's name. We select - pools from the an offset into the pool list, and then repeat at an - interval set by # mons in the configuration. This splits up the pools - we have, so each mon looks at a discrete set of pools instead of all - mons performing all scans. - :param pools: (list) rados pool names - :param mons: (list) monitor names from ceph health - :return: (list) of pools this monitor should scan. empty list if the - monitor name mismatches - so no scans done - """ - - pools_to_scan = [] - - try: - freq = mons.index(common.get_hostname()) - except ValueError: - # this host's name is not in the monitor list? - # twilight zone moment - pass - else: - - pools_to_scan = [pools[ptr] - for ptr in xrange(freq, len(pools), len(mons))] - - return pools_to_scan - - def get_pools(self): - skip_pools = ('default.rgw') - - start = time.time() - conf_file = "/etc/ceph/{}.conf".format(self.cluster_name) - with rados.Rados(conffile=conf_file) as cluster: - rados_pools = sorted(cluster.list_pools()) - end = time.time() - - self.logger.debug('lspools took {:.3f}s'.format(end - start)) - - filtered_pools = [pool for pool in rados_pools - if not pool.startswith(skip_pools)] - - return filtered_pools - - def _get_rbds(self, monitors): - - pool_list = self.get_pools() - mon_list = sorted(monitors.keys()) - my_pools = Mon._select_pools(pool_list, mon_list) - self.logger.debug("Pools to be scanned on this mon" - " : {}".format(','.join(my_pools))) - threads = [] - - start = time.time() - - for pool in my_pools: - thread = RBDScanner(self.cluster_name, pool) - thread.start() - threads.append(thread) - - # wait for all threads to complete - for thread in threads: - thread.join(1) - - end = time.time() - self.logger.debug("rbd scans {:.3f}s".format((end - start))) - - total_rbds = sum([thread.num_rbds for thread in threads]) - self.logger.debug("total rbds found : {}".format(total_rbds)) - - for thread in threads: - del thread - - return total_rbds - - def get_stats(self): - """ - method associated with the plugin callback to gather the metrics - :return: (dict) metadata describing the state of the mon/osd's - """ - - start = time.time() - - pool_stats = self._get_pool_stats() - num_osd_hosts, osd_states = self._get_osd_states() - cluster_state = self.get_mon_health() - cluster_state['num_osd_hosts'] = num_osd_hosts - cluster_state['num_rbds'] = self._get_rbds(cluster_state['mon_status']) - - all_stats = common.merge_dicts(cluster_state, {"pools": pool_stats, - "osd_state": osd_states}) - - end = time.time() - self.logger.info("mon get_stats call : {:.3f}s".format((end - start))) - - return {"mon": all_stats} - diff --git a/cephmetrics/collectors/osd.py b/cephmetrics/collectors/osd.py deleted file mode 100644 index c95727b..0000000 --- a/cephmetrics/collectors/osd.py +++ /dev/null @@ -1,332 +0,0 @@ -#!/usr/bin/env python - -import os -import time -import math - -from cephmetrics.collectors import (base, common) - -__author__ = "Paul Cuzner" - - -class OSDstats(object): - - osd_capacity = { - "stat_bytes": ("stat_bytes", "gauge"), - "stat_bytes_used": ("stat_bytes_used", "gauge"), - "stat_bytes_avail": ("stat_bytes_avail", "gauge") - } - - filestore_metrics = { - "journal_latency", - "commitcycle_latency", - "apply_latency", - "queue_transaction_latency_avg" - } - - def __init__(self, osd_type='filestore'): - self._current = {} - self._previous = {} - self._osd_type = osd_type - self.osd_percent_used = 0 - - def update(self, stats): - """ - update the objects attributes based on the dict - :param stats: (dict) containing filestore performance ('filestore') - and capacity info ('osd') - :return: None - """ - - if self._current: - self._previous = self._current - self._current = stats['filestore'] - else: - self._current = stats['filestore'] - - for attr in OSDstats.filestore_metrics: - - if self._previous: - d_sum = self._current[attr].get('sum') - \ - self._previous[attr].get('sum') - d_avgcount = self._current[attr].get('avgcount') - \ - self._previous[attr].get('avgcount') - - if d_sum == 0 or d_avgcount == 0: - val = 0 - else: - val = float(d_sum) / d_avgcount - else: - # no previous value, so set to 0 - val = 0 - - setattr(self, attr, val) - - for attr in stats['osd']: - setattr(self, attr, stats['osd'].get(attr)) - - self.osd_percent_used = math.ceil((float(self.stat_bytes_used) / - self.stat_bytes) * 100) - - -class OSDs(base.BaseCollector): - - all_metrics = common.merge_dicts( - common.Disk.metrics, common.IOstat.metrics) - - def __init__(self, cluster_name, **kwargs): - base.BaseCollector.__init__(self, cluster_name, **kwargs) - self.timestamp = int(time.time()) - - self.osd = {} # dict of disk objects, each disk contains osd_id - self.jrnl = {} # dict of journal devices (if not collocated) - self.osd_id_list = [] - self.dev_lookup = {} # dict dev_name -> osd | jrnl - self.osd_count = 0 - - def __repr__(self): - - s = '' - for disk in self.osd: - s += "{}\n".format(disk) - dev = self.osd[disk] - - for var in vars(dev): - if not var.startswith('_'): - s += "{} ... {}\n".format(var, getattr(dev, var)) - return s - - def _fetch_osd_stats(self, osd_id): - - # NB: osd stats are cumulative - - stats = {} - osd_socket_name = '/var/run/ceph/{}-osd.{}.asok'.format( - self.cluster_name, osd_id) - - if not os.path.exists(osd_socket_name): - # all OSD's should expose an admin socket, so if it's missing - # this node has a problem! - raise IOError("Socket file missing for OSD {}".format(osd_id)) - - self.logger.debug("fetching osd stats for osd {}".format(osd_id)) - resp = self._admin_socket(socket_path=osd_socket_name) - - filestore_stats = resp.get('filestore') - stats['filestore'] = {key_name: filestore_stats.get(key_name) - for key_name in OSDstats.filestore_metrics} - - osd_stats = resp.get('osd') - - # Add disk usage stats - stats['osd'] = {key_name: osd_stats.get(key_name) - for key_name in OSDstats.osd_capacity.keys()} - - return stats - - @staticmethod - def get_osd_type(osd_path): - - osd_type_fname = os.path.join(osd_path, 'type') - if os.path.exists(osd_type_fname): - return common.fread(osd_type_fname) - else: - if os.path.exists(os.path.join(osd_path, 'journal')): - return "filestore" - else: - raise ValueError("Unrecognised OSD type") - - def _dev_to_osd(self): - """ - Look at the system to determine which disks are acting as OSD's - """ - - # the logic here uses the mount points to determine which OSD's are - # in the system. The encryption state is determine just by the use - # devicemapper (i.e. /dev/mapper prefixed devices) - since at this time - # this is all dm is used for. - - osd_indicators = {'var', 'lib', 'osd'} - - for mnt in common.freadlines('/proc/mounts'): - items = mnt.split(' ') - dev_path, path_name = items[:2] - if path_name.startswith('/var/lib'): - # take a close look since this is where ceph osds usually - # get mounted - dirs = set(path_name.split('/')) - if dirs.issuperset(osd_indicators): - - # get the osd_id from the name is the most simple way - # to get the id, due to naming conventions. If this fails - # though, plan 'b' is the whoami file - osd_id = path_name.split('-')[-1] - if not osd_id.isdigit(): - osd_id = common.fread( - os.path.join(path_name, 'whoami')) - - if osd_id not in self.osd: - osd_type = OSDs.get_osd_type(path_name) - self.osd[osd_id] = OSDstats(osd_type=osd_type) - self.osd_id_list.append(osd_id) - - osd_type = self.osd[osd_id]._osd_type - if osd_type == 'filestore': - if dev_path.startswith('/dev/mapper'): - encrypted = 1 - uuid = dev_path.split('/')[-1] - partuuid = '/dev/disk/by-partuuid/{}'.format( - uuid) - dev_path = os.path.realpath(partuuid) - osd_device = dev_path.split('/')[-1] - else: - encrypted = 0 - osd_device = dev_path.split('/')[-1] - - elif osd_type == 'bluestore': - block_link = os.path.join(path_name, 'block') - osd_path = os.path.realpath(block_link) - osd_device = osd_path.split('/')[-1] - encrypted = 0 - else: - raise ValueError("Unknown OSD type encountered") - - # if the osd_id hasn't been seem neither has the - # disk - self.osd[osd_device] = common.Disk( - osd_device, - path_name=path_name, - osd_id=osd_id, - in_osd_type=osd_type, - encrypted=encrypted) - self.dev_lookup[osd_device] = 'osd' - self.osd_count += 1 - - if osd_type == 'filestore': - journal_link = os.path.join(path_name, 'journal') - else: - journal_link = os.path.join(path_name, 'block.wal') - - if os.path.exists(journal_link): - link_tgt = os.readlink(journal_link) - if link_tgt.startswith('/dev/mapper'): - encrypted = 1 - else: - encrypted = 0 - - partuuid_path = os.path.join( - '/dev/disk/by-partuuid', - link_tgt.split('/')[-1]) - jrnl_path = os.path.realpath(partuuid_path) - jrnl_dev = jrnl_path.split('/')[-1] - - if jrnl_dev not in self.osd: - self.jrnl[jrnl_dev] = common.Disk( - jrnl_dev, - osd_id=osd_id, - in_osd_type=osd_type, - encrypted=encrypted) - - self.dev_lookup[jrnl_dev] = 'jrnl' - - else: - # No journal or WAL link..? - pass - - def _stats_lookup(self): - """ - Grab the disk stats from /proc/diskstats, and the key osd perf dump - counters - """ - - now = time.time() - interval = int(now) - self.timestamp - self.timestamp = int(now) - - # Fetch diskstats from the OS - for perf_entry in common.freadlines('/proc/diskstats'): - - field = perf_entry.split() - dev_name = field[2] - - device = None - if self.dev_lookup.get(dev_name, None) == 'osd': - device = self.osd[dev_name] - elif self.dev_lookup.get(dev_name, None) == 'jrnl': - device = self.jrnl[dev_name] - - if device: - new_stats = field[3:] - - if device.perf._current: - device.perf._previous = device.perf._current - device.perf._current = new_stats - else: - device.perf._current = new_stats - - device.perf.compute(interval) - device.refresh() - - end = time.time() - self.logger.debug("OS disk stats calculated in " - "{:.4f}s".format(end-now)) - - # fetch stats from each osd daemon - osd_stats_start = time.time() - for osd_id in self.osd_id_list: - - if self.osd[osd_id]._osd_type == 'filestore': - osd_stats = self._fetch_osd_stats(osd_id) - - # self.logger.debug('stats : {}'.format(osd_stats)) - - osd_device = self.osd[osd_id] - osd_device.update(osd_stats) - else: - self.logger.debug("skipped 'bluestore' osd perf collection " - "for osd.{}".format(osd_id)) - - osd_stats_end = time.time() - self.logger.debug( - "OSD perf dump stats collected for {} OSDs in {:.3f}s".format( - len(self.osd_id_list), (osd_stats_end - osd_stats_start))) - - @staticmethod - def _dump_devs(device_dict): - - dumped = {} - - for dev_name in sorted(device_dict): - device = device_dict[dev_name] - dumped[dev_name] = common.todict(device) - - return dumped - - def dump(self): - """ - dump the osd object(s) to a dict. The object *must* not have references - to other objects - if this rule is broken cephmetrics caller will fail - when parsing the dict - - :return: (dict) dictionary representation of this OSDs on this host - """ - - return { - "num_osds": self.osd_count, - "osd": OSDs._dump_devs(self.osd), - "jrnl": OSDs._dump_devs(self.jrnl) - } - - def get_stats(self): - - start = time.time() - - self._dev_to_osd() - self._stats_lookup() - - end = time.time() - - self.logger.info("osd get_stats call " - ": {:.3f}s".format((end - start))) - - return self.dump() diff --git a/cephmetrics/collectors/rgw.py b/cephmetrics/collectors/rgw.py deleted file mode 100644 index 5dbdab3..0000000 --- a/cephmetrics/collectors/rgw.py +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env python - -import time - -from cephmetrics.collectors import (base, common) - -__author__ = "paul.cuzner@redhat.com" - - -class RGW(base.BaseCollector): - - simple_metrics = { - "req": ("requests", "derive"), - "failed_req": ("requests_failed", "derive"), - "get": ("gets", "derive"), - "get_b": ("get_bytes", "derive"), - "put": ("puts", "derive"), - "put_b": ("put_bytes", "derive"), - "qlen": ("qlen", "derive"), - "qactive": ("requests_active", "derive") - } - - int_latencies = [ - "get_initial_lat", - "put_initial_lat" - ] - - latencies = { - "get_initial_lat_sum": ("get_initial_lat_sum", "derive"), - "get_initial_lat_avgcount": ("get_initial_lat_avgcount", "derive"), - "put_initial_lat_sum": ("put_initial_lat_sum", "derive"), - "put_initial_lat_avgcount": ("put_initial_lat_avgcount", "derive") - } - - all_metrics = common.merge_dicts(simple_metrics, latencies) - - def __init__(self, cluster_name, admin_socket, **kwargs): - base.BaseCollector.__init__(self, cluster_name, admin_socket, **kwargs) - self.host_name = common.get_hostname() - - def _get_rgw_data(self): - - response = self._admin_socket() - - key_name = 'client.rgw.{}'.format(self.host_name) - - return response.get(key_name) - - def _filter(self, stats): - # pick out the simple metrics - - filtered = {key: stats[key] for key in RGW.simple_metrics} - - for key in RGW.int_latencies: - for _attr in stats[key]: - new_key = "{}_{}".format(key, _attr) - filtered[new_key] = stats[key].get(_attr) - - return filtered - - def get_stats(self): - - start = time.time() - - raw_stats = self._get_rgw_data() - - stats = self._filter(raw_stats) - - end = time.time() - - self.logger.info("RGW get_stats : {:.3f}s".format((end - start))) - - return {"rgw": stats} diff --git a/cephmetrics_collectors/__init__.py b/cephmetrics_collectors/__init__.py new file mode 100644 index 0000000..a5abc98 --- /dev/null +++ b/cephmetrics_collectors/__init__.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +import glob +import os +import sys + +# Remove collectd's plugin dir from sys.path to avoid namespace collisions with +# .so files that are not actually Python modules +sys.path = [item for item in sys.path if not item.endswith('/collectd')] + +from . import (common, iscsi, mon, osd, rgw) + + +class Ceph(object): + def __init__(self): + self.cluster_name = None + self.host_name = common.get_hostname() + + self.mon_socket = None + self.rgw_socket = None + + self.mon = None + self.rgw = None + self.osd = None + self.iscsi = None + + def probe(self): + """ + set up which collector(s) to use, based on what types of sockets we + find in /var/run/ceph + """ + + mon_socket = '/var/run/ceph/{}-mon.{}.asok'.format(self.cluster_name, + self.host_name) + if os.path.exists(mon_socket): + self.mon_socket = mon_socket + self.mon = mon.Mon(self.cluster_name, admin_socket=mon_socket) + + rgw_socket_list = glob.glob('/var/run/ceph/{}-client.rgw.*.' + 'asok'.format(self.cluster_name)) + + if rgw_socket_list: + rgw_socket = rgw_socket_list[0] + self.rgw = rgw.RGW(self.cluster_name, admin_socket=rgw_socket) + + osd_socket_list = glob.glob('/var/run/ceph/{}-osd.*' + '.asok'.format(self.cluster_name)) + mounted = common.freadlines('/proc/mounts') + osds_mounted = [mnt for mnt in mounted + if mnt.split()[1].startswith('/var/lib/ceph')] + if osd_socket_list or osds_mounted: + self.osd = osd.OSDs(self.cluster_name) + + if os.path.exists('/sys/kernel/config/target/iscsi'): + self.iscsi = iscsi.ISCSIGateway(self.cluster_name) diff --git a/cephmetrics_collectors/base.py b/cephmetrics_collectors/base.py new file mode 100644 index 0000000..f2f2295 --- /dev/null +++ b/cephmetrics_collectors/base.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +import json +import time +import logging + +from ceph_daemon import admin_socket + + +class BaseCollector(object): + + def __init__(self, cluster_name, admin_socket=None): + self.cluster_name = cluster_name + self.admin_socket = admin_socket + + self.logger = logging.getLogger('cephmetrics') + + def _admin_socket(self, cmds=None, socket_path=None): + + adm_socket = self.admin_socket if not socket_path else socket_path + + if not cmds: + cmds = ['perf', 'dump'] + + start = time.time() + response = admin_socket(adm_socket, cmds, + format='json') + end = time.time() + + self.logger.debug("admin_socket call '{}' : " + "{:.3f}s".format(' '.join(cmds), + (end - start))) + + return json.loads(response) + + def get_stats(self): + + return {} diff --git a/cephmetrics_collectors/common.py b/cephmetrics_collectors/common.py new file mode 100644 index 0000000..8b94dcc --- /dev/null +++ b/cephmetrics_collectors/common.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python + + +import socket +from os import statvfs +import math + + +def get_hostname(): + return socket.gethostname().split('.')[0] + + +def add_dicts(dict1, dict2): + """ + Add dictionary values together + :param dict1: + :param dict2: + :return: dict with matching fields sum'd together + """ + return {key: dict1.get(key, 0) + dict2.get(key, 0) + for key in set(dict1).union(dict2)} + + +def merge_dicts(dict1, dict2): + """ + merges two dicts together to form a single dict. when dict keys overlap + the value in the 2nd dict takes precedence + :param dict1: + :param dict2: + :return: combined dict + """ + + new = dict1.copy() + new.update(dict2) + + return new + + +def flatten_dict(data, separator='.', prefix=''): + """ + flatten a dict, so it is just simple key/value pairs + :param data: (dict) + :param separator: (str) char to use when combining keys + :param prefix: key prefix + :return: + """ + return {prefix + separator + k if prefix else k: v + for kk, vv in data.items() + for k, v in flatten_dict(vv, separator, kk).items() + } if isinstance(data, dict) else {prefix: data} + + +def todict(obj): + """ + convert an object to a dict representation + :param obj: (object) object to examine, to extract variables/values from + :return: (dict) representation of the given object + """ + data = {} + for key, value in obj.__dict__.iteritems(): + + if key.startswith('_'): + continue + + try: + data[key] = todict(value) + except AttributeError: + data[key] = value + + return data + + +def fread(file_name=None): + """ + Simple read function for files of a single value + :param file_name: (str) file name to read + :return: (str) contents of the file + """ + + with open(file_name, 'r') as f: + setting = f.read().rstrip() + return setting + + +def freadlines(file_name=None): + """ + simple readlines function to return all records of a given file + :param file_name: (str) file name to read + :return: (list) contents of the file + """ + + with open(file_name, 'r') as f: + data = f.readlines() + return data + + +class IOstat(object): + raw_metrics = [ + "_reads", + "_reads_mrgd", + "_sectors_read", + "_read_ms", + "_writes", + "_writes_mrgd", + "_sectors_written", + "_write_ms", + "_current_io", + "_ms_active_io", + "_ms_active_io_w" + ] + + sector_size = 512 + + metrics = { + "iops": ("iops", "gauge"), + "r_iops": ("r_iops", "gauge"), + "w_iops": ("w_iops", "gauge"), + "bytes_per_sec": ("bytes_per_sec", "gauge"), + "r_bytes_per_sec": ("r_bytes_per_sec", "gauge"), + "w_bytes_per_sec": ("w_bytes_per_sec", "gauge"), + "util": ("util", "gauge"), + "await": ("await", "gauge"), + "r_await": ("r_await", "gauge"), + "w_await": ("w_await", "gauge"), + } + + def __init__(self): + self._previous = [] + self._current = [] + + # Seed the metrics we're interested in + for ctr in IOstat.metrics.keys(): + setattr(self, ctr, 0) + + def __str__(self): + s = '\n- IOstat object:\n' + for key in sorted(vars(self)): + s += '\t{} ... {}\n'.format(key, getattr(self, key)) + return s + + def _calc_raw_delta(self): + if not self._previous: + # nothing to compute yet + for ptr in range(len(IOstat.raw_metrics)): + key = IOstat.raw_metrics[ptr] + setattr(self, key, 0) + else: + for ptr in range(len(IOstat.raw_metrics)): + key = IOstat.raw_metrics[ptr] + setattr(self, key, (int(self._current[ptr]) - + int(self._previous[ptr]))) + + def compute(self, sample_interval): + """ + Calculate the iostats for this device + """ + + self._calc_raw_delta() + + if sample_interval > 0: + interval_ms = sample_interval * 1000 + total_io = self._reads + self._writes + self.util = float(self._ms_active_io) / interval_ms * 100 + self.iops = int(total_io) / sample_interval + self.r_iops = int(self._reads) / sample_interval + self.w_iops = int(self._writes) / sample_interval + self.await = (float(self._write_ms + self._read_ms) / total_io if + total_io > 0 else 0) + self.w_await = float( + self._write_ms) / self._writes if self._writes > 0 else 0 + self.r_await = float( + self._read_ms) / self._reads if self._reads > 0 else 0 + self.r_bytes_per_sec = (float( + self._sectors_read * IOstat.sector_size)) / sample_interval + self.w_bytes_per_sec = (float( + self._sectors_written * IOstat.sector_size)) / sample_interval + self.bytes_per_sec = self.r_bytes_per_sec + self.w_bytes_per_sec + + +class Disk(object): + + metrics = { + "rotational": ("rotational", "gauge"), + "disk_size": ("disk_size", "gauge"), + "fs_size": ("fs_size", "gauge"), + "fs_used": ("fs_used", "gauge"), + "fs_percent_used": ("fs_percent_used", "gauge"), + "osd_id": ("osd_id", "gauge") + } + + osd_types = {"filestore": 0, "bluestore": 1} + + def __init__(self, device_name, path_name=None, osd_id=None, + in_osd_type="filestore", encrypted=0): + + self._name = device_name + self._path_name = path_name + self._base_dev = Disk.get_base_dev(device_name) + self.osd_id = osd_id + + self.rotational = self._get_rota() + self.disk_size = self._get_size() + self.perf = IOstat() + self.fs_size = 0 + self.fs_percent_used = 0 + self.fs_used = 0 + self.encrypted = encrypted + self.osd_type = Disk.osd_types[in_osd_type] + + self.refresh() + + def _get_size(self): + return int(fread("/sys/block/{}/size".format(self._base_dev))) * 512 + + def _get_rota(self): + return int( + fread("/sys/block/{}/queue/rotational".format(self._base_dev))) + + def _get_fssize(self): + s = statvfs("{}/whoami".format(self._path_name)) + fs_size = s.f_blocks * s.f_bsize + fs_used = fs_size - (s.f_bfree * s.f_bsize) + fs_percent_used = math.ceil((float(fs_used) / fs_size) * 100) + return fs_size, fs_used, fs_percent_used + + def refresh(self): + # only run the fs size update, if the _path_name is set. + if self._path_name: + self.fs_size, self.fs_used, self.fs_percent_used = \ + self._get_fssize() + + @staticmethod + def get_base_dev(dev_name): + + # for intelcas devices, just use the device name as is + if dev_name.startswith('intelcas'): + device = dev_name + elif dev_name.startswith('nvme'): + if 'p' in dev_name: + device = dev_name[:(dev_name.index('p'))] + else: + device = dev_name + else: + # default strip any numeric ie. sdaa1 -> sdaa + device = filter(lambda ch: ch.isalpha(), dev_name) + + return device diff --git a/cephmetrics_collectors/iscsi.py b/cephmetrics_collectors/iscsi.py new file mode 100644 index 0000000..c93c60f --- /dev/null +++ b/cephmetrics_collectors/iscsi.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python2 + +# requires python-rtslib_fb for LIO interaction +# +# NB. the rtslib_fb module is dynamically loaded by the ISCSIGateway +# class instantiation. This prevents import errors within the generic parent +# module cephmetrics +# +import os +import sys +import time + +from cephmetrics_collectors import (base, common) + + +class Client(object): + + def __init__(self, iqn): + self.iqn = iqn + self.name = iqn.replace('.', '-') + self.luns = {} + self.lun_count = 0 + self._cycle = 0 + + def dump(self): + client_dump = {} + lun_info = {} + client_dump[self.name] = {"luns": {}, + "lun_count": self.lun_count} + for lun_name in self.luns: + lun = self.luns[lun_name] + lun_info.update(lun.dump()) + + return {self.name: {"luns": lun_info, + "lun_count": len(lun_info)} + } + + +class LUN(object): + + def __init__(self, client, tpg_lun): + self._path = tpg_lun.storage_object.path + self._tpg_lun = tpg_lun + self._name = tpg_lun.storage_object.name + self._display_name = tpg_lun.storage_object.name.replace('.', "-") + self._so = tpg_lun.storage_object + self._client = client + self._cycle = 0 + self.size = 0 + self.iops = 0 + self.read_bytes_per_sec = 0 + self.write_bytes_per_sec = 0 + self.total_bytes_per_sec = 0 + self.active_path = 0 + + def refresh(self, cycle_id): + self._cycle = cycle_id + self.size = self._so.size + stats_path = os.path.join(self._path, 'statistics/scsi_lu') + self.iops = int(common.fread(os.path.join(stats_path, "num_cmds"))) + read_mb = float(common.fread(os.path.join(stats_path, "read_mbytes"))) + write_mb = float( + common.fread(os.path.join(stats_path, "write_mbytes"))) + self.read_bytes_per_sec = int(read_mb * 1024 ** 2) + self.write_bytes_per_sec = int(write_mb * 1024 ** 2) + self.total_bytes_per_sec = self.read_bytes_per_sec + \ + self.write_bytes_per_sec + + if self._tpg_lun.alua_tg_pt_gp_name == 'ao': + self.active_path = 1 + else: + self.active_path = 0 + + def dump(self): + return {self._display_name: {k: getattr(self, k) for k in self.__dict__ + if not k.startswith("_")}} + + +class ISCSIGateway(base.BaseCollector): + """ + created on a host that has a /sys/kernel/config/target/iscsi dir + i.e. there is an iscsi gateway here! + """ + + metrics = { + "lun_count": ("lun_count", "gauge"), + "client_count": ("client_count", "gauge"), + "tpg_count": ("tpg_count", "gauge"), + "sessions": ("sessions", "gauge"), + "capacity": ("capacity", "gauge"), + "iops": ("iops", "derive"), + "read_bytes_per_sec": ("read_bytes_per_sec", "derive"), + "write_bytes_per_sec": ("write_bytes_per_sec", "derive"), + "total_bytes_per_sec": ("total_bytes_per_sec", "derive") + } + + def __init__(self, *args, **kwargs): + base.BaseCollector.__init__(self, *args, **kwargs) + + # Since the module can be imported by a parent class but not + # instantiated, the rtslib import is deferred until the first instance + # of the the class is created. This keeps the parent module simple + # and more importantly generic + if 'rtslib_fb.root' not in sys.modules.keys(): + + try: + import rtslib_fb.root as RTSRoot + except ImportError: + raise ImportError("rtslib_fb python package is missing") + + self._root = RTSRoot() + + self.clients = {} + self.cycle = 0 + + self.iops = 0 + self.read_bytes_per_sec = 0 + self.write_bytes_per_sec = 0 + self.total_bytes_per_sec = 0 + + def refresh(self): + """ + populate the instance by exploring rtslib + """ + + self.iops = 0 + self.read_bytes_per_sec = 0 + self.write_bytes_per_sec = 0 + self.total_bytes_per_sec = 0 + + if self.cycle == 10: + self.cycle = 0 + else: + self.cycle += 1 + + for node_acl in self._root.node_acls: + + client_name = node_acl.node_wwn + + if client_name not in self.clients: + new_client = Client(client_name) + self.clients[client_name] = new_client + + client = self.clients[client_name] + client.lun_count = 0 + client._cycle = self.cycle + + for lun in node_acl.mapped_luns: + client.lun_count += 1 + tpg_lun = lun.tpg_lun + lun_name = tpg_lun.storage_object.name + if lun_name not in client.luns: + lun = LUN(client, tpg_lun) + client.luns[lun._name] = lun + else: + lun = client.luns[lun_name] + + lun.refresh(self.cycle) + + self.iops += lun.iops + self.read_bytes_per_sec += lun.read_bytes_per_sec + self.write_bytes_per_sec += lun.write_bytes_per_sec + self.total_bytes_per_sec = self.read_bytes_per_sec + \ + self.write_bytes_per_sec + + def prune(self): + """ + drop child objects held by the instance, that are no longer in the + iSCSI config i.e. don't report on old information + """ + + for client_name in self.clients: + client = self.clients[client_name] + + for lun_name in client.luns: + lun = client.luns[lun_name] + if lun._cycle != self.cycle: + # drop the lun entry + self.logger.debug("pruning LUN '{}'".format(lun_name)) + + del client.luns[lun_name] + + if client._cycle != self.cycle: + # drop the client entry + self.logger.debug("pruning client '{}'".format(client_name)) + del self.clients[client_name] + + def dump(self): + + gw_stats = {} + client_stats = {} + + for metric in ISCSIGateway.metrics: + gw_stats[metric] = getattr(self, metric) + + for client_name in self.clients: + client = self.clients[client_name] + client_stats.update(client.dump()) + + return {"iscsi": { + "gw_name": {self.gateway_name: 0}, + "gw_stats": gw_stats, + "gw_clients": client_stats + } + } + + def _get_so(self): + return [so for so in self._root.storage_objects] + + def _get_node_acls(self): + return [node for node in self._root.node_acls] + + @property + def tpg_count(self): + return len([tpg for tpg in self._root.tpgs]) + + @property + def lun_count(self): + return len(self._get_so()) + + @property + def sessions(self): + return len([session for session in self._root.sessions]) + + @property + def gateway_name(self): + # Only the 1st gateway is considered/supported + gw_iqn = [gw.wwn for gw in self._root.targets][0] + return gw_iqn.replace('.', '-') + + @property + def client_count(self): + return len(self._get_node_acls()) + + @property + def capacity(self): + return sum([so.size for so in self._get_so()]) + + def get_stats(self): + + start = time.time() + + # populate gateway instance with the latest configuration from rtslib + self.refresh() + + # Overtime they'll be churn in client and disks so we need to drop + # any entries from prior runs that are no longer seen in the iscsi + # configuration with the prune method + self.prune() + + end = time.time() + + self.logger.info("LIO stats took {}s".format(end - start)) + + return self.dump() diff --git a/cephmetrics_collectors/mon.py b/cephmetrics_collectors/mon.py new file mode 100644 index 0000000..3d75238 --- /dev/null +++ b/cephmetrics_collectors/mon.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python + +import rados +import rbd +import json +import threading +import time +import logging + +from cephmetrics_collectors import (base, common) + + +class RBDScanner(threading.Thread): + + def __init__(self, cluster_name, pool_name): + self.cluster_name = cluster_name + self.pool_name = pool_name + self.num_rbds = 0 + self.logger = logging.getLogger('cephmetrics') + + threading.Thread.__init__(self) + + def run(self): + rbd_images = [] + conf_file = "/etc/ceph/{}.conf".format(self.cluster_name) + self.logger.debug("scan of '{}' starting".format(self.pool_name)) + with rados.Rados(conffile=conf_file) as cluster: + with cluster.open_ioctx(self.pool_name) as ioctx: + rbd_inst = rbd.RBD() + self.logger.debug("listing rbd's in {}".format(self.pool_name)) + rbd_images = rbd_inst.list(ioctx) + + self.logger.info("pool scan complete for '{}'".format(self.pool_name)) + self.num_rbds = len(rbd_images) + + +class Mon(base.BaseCollector): + + health = { + "HEALTH_OK": 0, + "HEALTH_WARN": 4, + "HEALTH_ERR": 8 + } + + osd_state = { + "up": 0, + "down": 1 + } + + # metrics are declared, where each element has a description and collectd + # data type. The description is used to ensure the names sent by collectd + # remain the same even if the source name changes in ceph. + cluster_metrics = { + "num_mon": ("num_mon", "gauge"), + "num_mon_quorum": ("num_mon_quorum", "gauge"), + "num_rbds": ("num_rbds", "gauge"), + "num_osd_hosts": ("num_osd_hosts", "gauge"), + "num_osd": ("num_osd", "gauge"), + "num_osd_up": ("num_osd_up", "gauge"), + "num_osd_in": ("num_osd_in", "gauge"), + "osd_epoch": ("osd_epoch", "gauge"), + "osd_bytes": ("osd_bytes", "gauge"), + "osd_bytes_used": ("osd_bytes_used", "gauge"), + "osd_bytes_avail": ("osd_bytes_avail", "gauge"), + "num_pool": ("num_pool", "gauge"), + "num_pg": ("num_pg", "gauge"), + "num_pg_active_clean": ("num_pg_active_clean", "gauge"), + "num_pg_active": ("num_pg_active", "gauge"), + "num_pg_peering": ("num_pg_peering", "gauge"), + "num_object": ("num_object", "gauge"), + "num_object_degraded": ("num_object_degraded", "gauge"), + "num_object_misplaced": ("num_object_misplaced", "gauge"), + "num_object_unfound": ("num_object_unfound", "gauge"), + "num_bytes": ("num_bytes", "gauge"), + "num_mds_up": ("num_mds_up", "gauge"), + "num_mds_in": ("num_mds_in", "gauge"), + "num_mds_failed": ("num_mds_failed", "gauge"), + "mds_epoch": ("mds_epoch", "gauge"), + "health": ("health", "gauge") + } + + pool_client_metrics = { + 'bytes_sec': ("bytes_sec", "gauge"), + 'op_per_sec': ("op_per_sec", "gauge"), + 'read_bytes_sec': ("read_bytes_sec", "gauge"), + 'write_op_per_sec': ("write_op_per_sec", "gauge"), + 'write_bytes_sec': ("write_bytes_sec", "gauge"), + 'read_op_per_sec': ("read_op_per_sec", "gauge") + } + + pool_recovery_metrics = { + "recovering_objects_per_sec": ("recovering_objects_per_sec", "gauge"), + "recovering_bytes_per_sec": ("recovering_bytes_per_sec", "gauge"), + "recovering_keys_per_sec": ("recovering_keys_per_sec", "gauge"), + "num_objects_recovered": ("num_objects_recovered", "gauge"), + "num_bytes_recovered": ("num_bytes_recovered", "gauge"), + "num_keys_recovered": ("num_keys_recovered", "gauge") + } + + osd_metrics = { + "status": ("status", "gauge") + } + + mon_states = { + "mon_status": ("mon_status", "gauge") + } + + all_metrics = common.merge_dicts( + pool_recovery_metrics, pool_client_metrics) + all_metrics = common.merge_dicts(all_metrics, cluster_metrics) + all_metrics = common.merge_dicts(all_metrics, osd_metrics) + all_metrics = common.merge_dicts(all_metrics, mon_states) + + def __init__(self, *args, **kwargs): + base.BaseCollector.__init__(self, *args, **kwargs) + self.version = self._get_version() + if self.version < 12: + self.get_mon_health = self._mon_health + else: + self.get_mon_health = self._mon_health_new + + def _get_version(self): + vers_info = self._mon_command('version') + return int(vers_info['version'].replace('.', ' ').split()[2]) + + def _mon_command(self, cmd_request): + """ Issue a command to the monitor """ + + buf_s = '{}' + conf_file = "/etc/ceph/{}.conf".format(self.cluster_name) + + start = time.time() + with rados.Rados(conffile=conf_file) as cluster: + cmd = {'prefix': cmd_request, 'format': 'json'} + rc, buf_s, out = cluster.mon_command(json.dumps(cmd), b'') + end = time.time() + + self.logger.debug("_mon_command call '{}' :" + " {:.3f}s".format(cmd_request, + (end - start))) + + return json.loads(buf_s) + + @staticmethod + def get_feature_state(summary_data, pg_states): + """ + Look at the summary list to determine the state of RADOS features + :param summary_data: (list) summary data from a ceph health command + :return: (dict) dict indexed by feature + 0 Inactive, 1 Active, 2 Disabled + """ + feature_lookup = {"noscrub": "scrub", + "nodeep-scrub": "deep_scrub", + "norecover": "recovery", + "nobackfill": "backfill", + "norebalance": "rebalance", + "noout": "out", + "nodown": "down"} + + # Start with all features inactive i.e. enabled + feature_state = {feature_lookup.get(key): 0 for key in feature_lookup} + + for summary in summary_data: + summary_desc = summary.get('summary') + if "flag(s) set" in summary_desc: + flags = summary_desc.replace(' flag(s) set', '').split(',') + for disabled_feature in flags: + if disabled_feature in feature_lookup: + feature = feature_lookup.get(disabled_feature) + feature_state[feature] = 2 # feature disabled + + # Now use the current pg state names to determine whether a feature is + # active - if not it stays set to '0', which means inactive + pg_state_names = [pg_state.get('name') for pg_state in pg_states] + for pg_state in pg_state_names: + states = pg_state.split('+') + if 'recovering' in states: + feature_state['recovery'] = 1 # Active + continue + if 'backfilling' in states: + feature_state['backfill'] = 1 + continue + if 'deep' in states: + feature_state['deep_scrub'] = 1 + continue + if 'scrubbing' in states: + feature_state['scrub'] = 1 + + return feature_state + + @classmethod + def check_stuck_pgs(cls, summary_list): + bad_pg_words = ['pgs', 'stuck', 'inactive'] + stuck_pgs = 0 + for summary_data in summary_list: + if summary_data.get('severity') != 'HEALTH_ERR': + continue + if all(trigger in summary_data.get('summary') + for trigger in bad_pg_words): + stuck_pgs = int(summary_data.get('summary').split()[0]) + + return stuck_pgs + + def _mon_health_new(self): + + cluster, health_data = self._mon_health_common() + + mon_status_output = self._mon_command('mon_status') + quorum_list = mon_status_output.get('quorum') + mon_list = mon_status_output.get('monmap').get('mons') + mon_status = {} + for mon in mon_list: + state = 0 if mon.get('rank') in quorum_list else 4 + mon_status[mon.get('name')] = state + + cluster['mon_status'] = mon_status + + return cluster + + def _mon_health_common(self): + + # for v12 (Luminous and beyond) add the following setting to + # ceph.conf "mon_health_preluminous_compat=true" + # this will provide the same output as pre-luminous + + cluster_data = self._admin_socket().get('cluster') + pg_data = self._mon_command("pg stat") + health_data = self._mon_command("health") + health_text = health_data.get('overall_status', + health_data.get('status', '')) + + cluster = {Mon.cluster_metrics[k][0]: cluster_data[k] + for k in cluster_data} + + health_num = Mon.health.get(health_text, 16) + + cluster['health'] = health_num + + pg_states = pg_data.get('num_pg_by_state') # list of dict name,num + health_summary = health_data.get('summary', []) # list of issues + cluster['num_pgs_stuck'] = Mon.check_stuck_pgs(health_summary) + cluster['features'] = Mon.get_feature_state(health_summary, + pg_states) + + self.logger.debug( + 'Features:{}'.format(json.dumps(cluster['features']))) + + return cluster, health_data + + def _mon_health(self): + + cluster, health_data = self._mon_health_common() + + services = health_data.get('health').get('health_services') + monstats = {} + for svc in services: + if 'mons' in svc: + # Each monitor will have a numeric value denoting health + monstats = { mon.get('name'): Mon.health.get(mon.get('health')) + for mon in svc.get('mons')} + + cluster['mon_status'] = monstats + + return cluster + + @classmethod + def _seed(cls, metrics): + return {metrics[key][0]: 0 for key in metrics} + + def display_names(self, metric_format, metrics): + """ + convert the keys to the static descriptions + :return: + """ + return {metric_format[k][0]: metrics[k] + for k in metrics} if metrics else {} + + def _get_pool_stats(self): + """ get pool stats from rados """ + + raw_stats = self._mon_command('osd pool stats') + pool_stats = {} + + # process each pool + for pool in raw_stats: + + pool_name = pool['pool_name'].replace('.', '_') + client_io = self.display_names(Mon.pool_client_metrics, + pool.get('client_io_rate')) + recovery = self.display_names(Mon.pool_recovery_metrics, + pool.get('recovery_rate')) + + pool_md = {} + if client_io: + + # Add pool level aggregation + client_io['bytes_sec'] = client_io.get('read_bytes_sec', 0) + \ + client_io.get('write_bytes_sec', 0) + client_io["op_per_sec"] = client_io.get('read_op_per_sec', 0)+ \ + client_io.get('write_op_per_sec', 0) + pool_md = client_io + + else: + pool_md = Mon._seed(Mon.pool_client_metrics) + + if recovery: + pool_md = common.merge_dicts(pool_md, recovery) + else: + pool_md = common.merge_dicts(pool_md, Mon._seed( + Mon.pool_recovery_metrics)) + + pool_stats[pool_name] = pool_md + + return pool_stats + + def _get_osd_states(self): + + self.logger.debug("fetching osd states from the local mon") + raw = self._mon_command('osd dump') + osd_hosts = set() + osds = {} + for osd in raw.get('osds'): + cluster_addr = osd.get('cluster_addr').split(':')[0] + osd_hosts.add(cluster_addr) + + # NB. The key for the osds dict must be a string as the dict is + # flattened when the metric name is derived in the parent collectd + # module. If it is not converted, you get a TypeError + osds[str(osd.get('osd'))] = {"up": osd.get('up'), + "in": osd.get('in')} + + return len(osd_hosts), osds + + @staticmethod + def _select_pools(pools, mons): + """ + determine the pools this mon should scan based on it's name. We select + pools from the an offset into the pool list, and then repeat at an + interval set by # mons in the configuration. This splits up the pools + we have, so each mon looks at a discrete set of pools instead of all + mons performing all scans. + :param pools: (list) rados pool names + :param mons: (list) monitor names from ceph health + :return: (list) of pools this monitor should scan. empty list if the + monitor name mismatches - so no scans done + """ + + pools_to_scan = [] + + try: + freq = mons.index(common.get_hostname()) + except ValueError: + # this host's name is not in the monitor list? + # twilight zone moment + pass + else: + + pools_to_scan = [pools[ptr] + for ptr in xrange(freq, len(pools), len(mons))] + + return pools_to_scan + + def get_pools(self): + skip_pools = ('default.rgw') + + start = time.time() + conf_file = "/etc/ceph/{}.conf".format(self.cluster_name) + with rados.Rados(conffile=conf_file) as cluster: + rados_pools = sorted(cluster.list_pools()) + end = time.time() + + self.logger.debug('lspools took {:.3f}s'.format(end - start)) + + filtered_pools = [pool for pool in rados_pools + if not pool.startswith(skip_pools)] + + return filtered_pools + + def _get_rbds(self, monitors): + + pool_list = self.get_pools() + mon_list = sorted(monitors.keys()) + my_pools = Mon._select_pools(pool_list, mon_list) + self.logger.debug("Pools to be scanned on this mon" + " : {}".format(','.join(my_pools))) + threads = [] + + start = time.time() + + for pool in my_pools: + thread = RBDScanner(self.cluster_name, pool) + thread.start() + threads.append(thread) + + # wait for all threads to complete + for thread in threads: + thread.join(1) + + end = time.time() + self.logger.debug("rbd scans {:.3f}s".format((end - start))) + + total_rbds = sum([thread.num_rbds for thread in threads]) + self.logger.debug("total rbds found : {}".format(total_rbds)) + + for thread in threads: + del thread + + return total_rbds + + def get_stats(self): + """ + method associated with the plugin callback to gather the metrics + :return: (dict) metadata describing the state of the mon/osd's + """ + + start = time.time() + + pool_stats = self._get_pool_stats() + num_osd_hosts, osd_states = self._get_osd_states() + cluster_state = self.get_mon_health() + cluster_state['num_osd_hosts'] = num_osd_hosts + cluster_state['num_rbds'] = self._get_rbds(cluster_state['mon_status']) + + all_stats = common.merge_dicts(cluster_state, {"pools": pool_stats, + "osd_state": osd_states}) + + end = time.time() + self.logger.info("mon get_stats call : {:.3f}s".format((end - start))) + + return {"mon": all_stats} + diff --git a/cephmetrics_collectors/osd.py b/cephmetrics_collectors/osd.py new file mode 100644 index 0000000..9dfb364 --- /dev/null +++ b/cephmetrics_collectors/osd.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python + +import os +import time +import math + +from cephmetrics_collectors import (base, common) + +__author__ = "Paul Cuzner" + + +class OSDstats(object): + + osd_capacity = { + "stat_bytes": ("stat_bytes", "gauge"), + "stat_bytes_used": ("stat_bytes_used", "gauge"), + "stat_bytes_avail": ("stat_bytes_avail", "gauge") + } + + filestore_metrics = { + "journal_latency", + "commitcycle_latency", + "apply_latency", + "queue_transaction_latency_avg" + } + + def __init__(self, osd_type='filestore'): + self._current = {} + self._previous = {} + self._osd_type = osd_type + self.osd_percent_used = 0 + + def update(self, stats): + """ + update the objects attributes based on the dict + :param stats: (dict) containing filestore performance ('filestore') + and capacity info ('osd') + :return: None + """ + + if self._current: + self._previous = self._current + self._current = stats['filestore'] + else: + self._current = stats['filestore'] + + for attr in OSDstats.filestore_metrics: + + if self._previous: + d_sum = self._current[attr].get('sum') - \ + self._previous[attr].get('sum') + d_avgcount = self._current[attr].get('avgcount') - \ + self._previous[attr].get('avgcount') + + if d_sum == 0 or d_avgcount == 0: + val = 0 + else: + val = float(d_sum) / d_avgcount + else: + # no previous value, so set to 0 + val = 0 + + setattr(self, attr, val) + + for attr in stats['osd']: + setattr(self, attr, stats['osd'].get(attr)) + + self.osd_percent_used = math.ceil((float(self.stat_bytes_used) / + self.stat_bytes) * 100) + + +class OSDs(base.BaseCollector): + + all_metrics = common.merge_dicts( + common.Disk.metrics, common.IOstat.metrics) + + def __init__(self, cluster_name, **kwargs): + base.BaseCollector.__init__(self, cluster_name, **kwargs) + self.timestamp = int(time.time()) + + self.osd = {} # dict of disk objects, each disk contains osd_id + self.jrnl = {} # dict of journal devices (if not collocated) + self.osd_id_list = [] + self.dev_lookup = {} # dict dev_name -> osd | jrnl + self.osd_count = 0 + + def __repr__(self): + + s = '' + for disk in self.osd: + s += "{}\n".format(disk) + dev = self.osd[disk] + + for var in vars(dev): + if not var.startswith('_'): + s += "{} ... {}\n".format(var, getattr(dev, var)) + return s + + def _fetch_osd_stats(self, osd_id): + + # NB: osd stats are cumulative + + stats = {} + osd_socket_name = '/var/run/ceph/{}-osd.{}.asok'.format( + self.cluster_name, osd_id) + + if not os.path.exists(osd_socket_name): + # all OSD's should expose an admin socket, so if it's missing + # this node has a problem! + raise IOError("Socket file missing for OSD {}".format(osd_id)) + + self.logger.debug("fetching osd stats for osd {}".format(osd_id)) + resp = self._admin_socket(socket_path=osd_socket_name) + + filestore_stats = resp.get('filestore') + stats['filestore'] = {key_name: filestore_stats.get(key_name) + for key_name in OSDstats.filestore_metrics} + + osd_stats = resp.get('osd') + + # Add disk usage stats + stats['osd'] = {key_name: osd_stats.get(key_name) + for key_name in OSDstats.osd_capacity.keys()} + + return stats + + @staticmethod + def get_osd_type(osd_path): + + osd_type_fname = os.path.join(osd_path, 'type') + if os.path.exists(osd_type_fname): + return common.fread(osd_type_fname) + else: + if os.path.exists(os.path.join(osd_path, 'journal')): + return "filestore" + else: + raise ValueError("Unrecognised OSD type") + + def _dev_to_osd(self): + """ + Look at the system to determine which disks are acting as OSD's + """ + + # the logic here uses the mount points to determine which OSD's are + # in the system. The encryption state is determine just by the use + # devicemapper (i.e. /dev/mapper prefixed devices) - since at this time + # this is all dm is used for. + + osd_indicators = {'var', 'lib', 'osd'} + + for mnt in common.freadlines('/proc/mounts'): + items = mnt.split(' ') + dev_path, path_name = items[:2] + if path_name.startswith('/var/lib'): + # take a close look since this is where ceph osds usually + # get mounted + dirs = set(path_name.split('/')) + if dirs.issuperset(osd_indicators): + + # get the osd_id from the name is the most simple way + # to get the id, due to naming conventions. If this fails + # though, plan 'b' is the whoami file + osd_id = path_name.split('-')[-1] + if not osd_id.isdigit(): + osd_id = common.fread( + os.path.join(path_name, 'whoami')) + + if osd_id not in self.osd: + osd_type = OSDs.get_osd_type(path_name) + self.osd[osd_id] = OSDstats(osd_type=osd_type) + self.osd_id_list.append(osd_id) + + osd_type = self.osd[osd_id]._osd_type + if osd_type == 'filestore': + if dev_path.startswith('/dev/mapper'): + encrypted = 1 + uuid = dev_path.split('/')[-1] + partuuid = '/dev/disk/by-partuuid/{}'.format( + uuid) + dev_path = os.path.realpath(partuuid) + osd_device = dev_path.split('/')[-1] + else: + encrypted = 0 + osd_device = dev_path.split('/')[-1] + + elif osd_type == 'bluestore': + block_link = os.path.join(path_name, 'block') + osd_path = os.path.realpath(block_link) + osd_device = osd_path.split('/')[-1] + encrypted = 0 + else: + raise ValueError("Unknown OSD type encountered") + + # if the osd_id hasn't been seem neither has the + # disk + self.osd[osd_device] = common.Disk( + osd_device, + path_name=path_name, + osd_id=osd_id, + in_osd_type=osd_type, + encrypted=encrypted) + self.dev_lookup[osd_device] = 'osd' + self.osd_count += 1 + + if osd_type == 'filestore': + journal_link = os.path.join(path_name, 'journal') + else: + journal_link = os.path.join(path_name, 'block.wal') + + if os.path.exists(journal_link): + link_tgt = os.readlink(journal_link) + if link_tgt.startswith('/dev/mapper'): + encrypted = 1 + else: + encrypted = 0 + + partuuid_path = os.path.join( + '/dev/disk/by-partuuid', + link_tgt.split('/')[-1]) + jrnl_path = os.path.realpath(partuuid_path) + jrnl_dev = jrnl_path.split('/')[-1] + + if jrnl_dev not in self.osd: + self.jrnl[jrnl_dev] = common.Disk( + jrnl_dev, + osd_id=osd_id, + in_osd_type=osd_type, + encrypted=encrypted) + + self.dev_lookup[jrnl_dev] = 'jrnl' + + else: + # No journal or WAL link..? + pass + + def _stats_lookup(self): + """ + Grab the disk stats from /proc/diskstats, and the key osd perf dump + counters + """ + + now = time.time() + interval = int(now) - self.timestamp + self.timestamp = int(now) + + # Fetch diskstats from the OS + for perf_entry in common.freadlines('/proc/diskstats'): + + field = perf_entry.split() + dev_name = field[2] + + device = None + if self.dev_lookup.get(dev_name, None) == 'osd': + device = self.osd[dev_name] + elif self.dev_lookup.get(dev_name, None) == 'jrnl': + device = self.jrnl[dev_name] + + if device: + new_stats = field[3:] + + if device.perf._current: + device.perf._previous = device.perf._current + device.perf._current = new_stats + else: + device.perf._current = new_stats + + device.perf.compute(interval) + device.refresh() + + end = time.time() + self.logger.debug("OS disk stats calculated in " + "{:.4f}s".format(end-now)) + + # fetch stats from each osd daemon + osd_stats_start = time.time() + for osd_id in self.osd_id_list: + + if self.osd[osd_id]._osd_type == 'filestore': + osd_stats = self._fetch_osd_stats(osd_id) + + # self.logger.debug('stats : {}'.format(osd_stats)) + + osd_device = self.osd[osd_id] + osd_device.update(osd_stats) + else: + self.logger.debug("skipped 'bluestore' osd perf collection " + "for osd.{}".format(osd_id)) + + osd_stats_end = time.time() + self.logger.debug( + "OSD perf dump stats collected for {} OSDs in {:.3f}s".format( + len(self.osd_id_list), (osd_stats_end - osd_stats_start))) + + @staticmethod + def _dump_devs(device_dict): + + dumped = {} + + for dev_name in sorted(device_dict): + device = device_dict[dev_name] + dumped[dev_name] = common.todict(device) + + return dumped + + def dump(self): + """ + dump the osd object(s) to a dict. The object *must* not have references + to other objects - if this rule is broken cephmetrics caller will fail + when parsing the dict + + :return: (dict) dictionary representation of this OSDs on this host + """ + + return { + "num_osds": self.osd_count, + "osd": OSDs._dump_devs(self.osd), + "jrnl": OSDs._dump_devs(self.jrnl) + } + + def get_stats(self): + + start = time.time() + + self._dev_to_osd() + self._stats_lookup() + + end = time.time() + + self.logger.info("osd get_stats call " + ": {:.3f}s".format((end - start))) + + return self.dump() diff --git a/cephmetrics_collectors/rgw.py b/cephmetrics_collectors/rgw.py new file mode 100644 index 0000000..772d598 --- /dev/null +++ b/cephmetrics_collectors/rgw.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +import time + +from cephmetrics_collectors import (base, common) + +__author__ = "paul.cuzner@redhat.com" + + +class RGW(base.BaseCollector): + + simple_metrics = { + "req": ("requests", "derive"), + "failed_req": ("requests_failed", "derive"), + "get": ("gets", "derive"), + "get_b": ("get_bytes", "derive"), + "put": ("puts", "derive"), + "put_b": ("put_bytes", "derive"), + "qlen": ("qlen", "derive"), + "qactive": ("requests_active", "derive") + } + + int_latencies = [ + "get_initial_lat", + "put_initial_lat" + ] + + latencies = { + "get_initial_lat_sum": ("get_initial_lat_sum", "derive"), + "get_initial_lat_avgcount": ("get_initial_lat_avgcount", "derive"), + "put_initial_lat_sum": ("put_initial_lat_sum", "derive"), + "put_initial_lat_avgcount": ("put_initial_lat_avgcount", "derive") + } + + all_metrics = common.merge_dicts(simple_metrics, latencies) + + def __init__(self, cluster_name, admin_socket, **kwargs): + base.BaseCollector.__init__(self, cluster_name, admin_socket, **kwargs) + self.host_name = common.get_hostname() + + def _get_rgw_data(self): + + response = self._admin_socket() + + key_name = 'client.rgw.{}'.format(self.host_name) + + return response.get(key_name) + + def _filter(self, stats): + # pick out the simple metrics + + filtered = {key: stats[key] for key in RGW.simple_metrics} + + for key in RGW.int_latencies: + for _attr in stats[key]: + new_key = "{}_{}".format(key, _attr) + filtered[new_key] = stats[key].get(_attr) + + return filtered + + def get_stats(self): + + start = time.time() + + raw_stats = self._get_rgw_data() + + stats = self._filter(raw_stats) + + end = time.time() + + self.logger.info("RGW get_stats : {:.3f}s".format((end - start))) + + return {"rgw": stats}