]> git-server-git.apps.pok.os.sepia.ceph.com Git - cephmetrics.git/commitdiff
Separate the collectd plugin from the collectors
authorZack Cerza <zack@redhat.com>
Thu, 10 Aug 2017 16:05:35 +0000 (09:05 -0700)
committerZack Cerza <zack@redhat.com>
Thu, 10 Aug 2017 17:55:49 +0000 (10:55 -0700)
This will enable me to, in a later PR, properly package the collectors
so that they can be placed in site-packages and used without collectd

Signed-off-by: Zack Cerza <zack@redhat.com>
21 files changed:
ansible/roles/ceph-collectd/files/cephmetrics [deleted symlink]
ansible/roles/ceph-collectd/files/cephmetrics.py [new symlink]
ansible/roles/ceph-collectd/files/cephmetrics_collectors [new symlink]
ansible/roles/ceph-collectd/tasks/configure_collectd.yml
ansible/roles/ceph-collectd/tasks/install_collectd_plugins.yml
cephmetrics.py [new file with mode: 0644]
cephmetrics/__init__.py [deleted file]
cephmetrics/collectors/__init__.py [deleted file]
cephmetrics/collectors/base.py [deleted file]
cephmetrics/collectors/common.py [deleted file]
cephmetrics/collectors/iscsi.py [deleted file]
cephmetrics/collectors/mon.py [deleted file]
cephmetrics/collectors/osd.py [deleted file]
cephmetrics/collectors/rgw.py [deleted file]
cephmetrics_collectors/__init__.py [new file with mode: 0644]
cephmetrics_collectors/base.py [new file with mode: 0644]
cephmetrics_collectors/common.py [new file with mode: 0644]
cephmetrics_collectors/iscsi.py [new file with mode: 0644]
cephmetrics_collectors/mon.py [new file with mode: 0644]
cephmetrics_collectors/osd.py [new file with mode: 0644]
cephmetrics_collectors/rgw.py [new file with mode: 0644]

diff --git a/ansible/roles/ceph-collectd/files/cephmetrics b/ansible/roles/ceph-collectd/files/cephmetrics
deleted file mode 120000 (symlink)
index e2d0dbe..0000000
+++ /dev/null
@@ -1 +0,0 @@
-../../../../cephmetrics/
\ No newline at end of file
diff --git a/ansible/roles/ceph-collectd/files/cephmetrics.py b/ansible/roles/ceph-collectd/files/cephmetrics.py
new file mode 120000 (symlink)
index 0000000..8de2567
--- /dev/null
@@ -0,0 +1 @@
+../../../../cephmetrics.py
\ No newline at end of file
diff --git a/ansible/roles/ceph-collectd/files/cephmetrics_collectors b/ansible/roles/ceph-collectd/files/cephmetrics_collectors
new file mode 120000 (symlink)
index 0000000..7451887
--- /dev/null
@@ -0,0 +1 @@
+../../../../cephmetrics_collectors
\ No newline at end of file
index d4df13263be9af6677a069b28982228e6200b7c4..a967c8651aa68f4d129426eb32da2966822fe70e 100644 (file)
@@ -51,5 +51,5 @@
   replace:
     dest: "{{ collectd_conf_d }}/cephmetrics.conf"
     regexp: 'ModulePath ".*"'
-    replace: 'ModulePath "{{ collectd_cephmetrics_dir }}"'
+    replace: 'ModulePath "{{ collectd_dir }}"'
   notify: Restart collectd
index 2deb6256ddbea4a00f0338b5c488c60e49c1c55e..ba0fff8f17303b3e1a467a44e674ef737948ee2d 100644 (file)
@@ -3,21 +3,26 @@
   set_fact:
     collectd_dir: "/usr/lib{{ '64' if ansible_pkg_mgr == 'yum' else '' }}/collectd"
 
-- name: Set collectd_cephmetrics_dir
+- name: Set collectors_dir
   set_fact:
-    collectd_cephmetrics_dir: "{{ collectd_dir }}/cephmetrics"
+    collectors_dir: "{{ collectd_dir }}/cephmetrics_collectors"
 
 - name: Remove stale Python files
   file:
     dest: "{{ item }}"
     state: absent
   with_items:
-    - "{{ collectd_cephmetrics_dir }}/cephmetrics.py"
-    - "{{ collectd_cephmetrics_dir }}/cephmetrics.pyc"
-    - "{{ collectd_cephmetrics_dir }}/collectors"
+    - "{{ collectd_dir }}/cephmetrics"
+    - "{{ collectors_dir }}/collectors"
 
-- name: Ship collector plugins
+- name: Ship collectors
   copy:
-    src: files/cephmetrics
-    dest: "{{ collectd_cephmetrics_dir }}"
+    src: files/cephmetrics_collectors/
+    dest: "{{ collectors_dir }}"
+  notify: Restart collectd
+
+- name: Ship collectd plugin
+  copy:
+    src: files/cephmetrics.py
+    dest: "{{ collectd_dir }}"
   notify: Restart collectd
diff --git a/cephmetrics.py b/cephmetrics.py
new file mode 100644 (file)
index 0000000..afb2574
--- /dev/null
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+import logging
+import os
+import sys
+
+TEST_MODE = (sys.argv[0].split('/')[-1] == 'py.test')
+try:
+    import collectd
+except ImportError:
+    if not TEST_MODE:
+        raise
+
+from cephmetrics_collectors import (Ceph, common, iscsi, mon, osd, rgw)
+
+__author__ = 'Paul Cuzner'
+
+PLUGIN_NAME = 'cephmetrics'
+
+
+def write_stats(role_metrics, stats):
+
+    flat_stats = common.flatten_dict(stats, '.')
+    for key_name in flat_stats:
+        attr_name = key_name.split('.')[-1]
+
+        # TODO: this needs some more think time, since the key from the name
+        # is not the key of the all_metrics dict
+        if attr_name in role_metrics:
+            attr_type = role_metrics[attr_name][1]     # gauge / derive etc
+        else:
+            # assign a default
+            attr_type = 'gauge'
+
+        attr_value = flat_stats[key_name]
+
+        val = collectd.Values(plugin=PLUGIN_NAME, type=attr_type)
+        instance_name = "{}.{}".format(CEPH.cluster_name,
+                                       key_name)
+        val.type_instance = instance_name
+        val.values = [attr_value]
+        val.dispatch()
+
+
+def configure_callback(conf):
+
+    valid_log_levels = ['debug', 'info']
+
+    global CEPH
+    module_parms = {node.key: node.values[0] for node in conf.children}
+
+    log_level = module_parms.get('LogLevel', 'debug')
+    if log_level not in valid_log_levels:
+        collectd.error("LogLevel specified is invalid - must"
+                       " be :{}".format(' or '.join(valid_log_levels)))
+
+    if 'ClusterName' in module_parms:
+        cluster_name = module_parms['ClusterName']
+        # cluster name is all we need to get started
+        if not os.path.exists('/etc/ceph/{}.conf'.format(cluster_name)):
+            collectd.error("Clustername given ('{}') not found in "
+                           "/etc/ceph".format(module_parms['ClusterName']))
+
+        # let's assume the conf file is OK to use
+        CEPH.cluster_name = cluster_name
+
+        setup_module_logging(log_level)
+
+        CEPH.probe()
+        collectd.info(
+            "{}: Roles detected - mon:{} osd:{} rgw:{} iscsi:{}".format(
+                __name__,
+                isinstance(CEPH.mon, mon.Mon),
+                isinstance(CEPH.osd, osd.OSDs),
+                isinstance(CEPH.rgw, rgw.RGW),
+                isinstance(CEPH.iscsi, iscsi.ISCSIGateway),
+            ))
+
+    else:
+        collectd.error("ClusterName is required")
+
+
+def setup_module_logging(log_level, path='/var/log/collectd-cephmetrics.log'):
+
+    level = {"debug": logging.DEBUG,
+             "info": logging.INFO}
+
+    logging.getLogger('cephmetrics')
+    log_conf = dict(
+        format='%(asctime)s - %(levelname)-7s - '
+               '[%(filename)s:%(lineno)s:%(funcName)s() - '
+               '%(message)s',
+        level=level.get(log_level)
+    )
+    if path:
+        log_conf['filename'] = path
+    logging.basicConfig(**log_conf)
+
+
+def read_callback():
+
+    if CEPH.mon:
+        mon_stats = CEPH.mon.get_stats()
+        write_stats(mon.Mon.all_metrics, mon_stats)
+
+    if CEPH.rgw:
+        rgw_stats = CEPH.rgw.get_stats()
+        write_stats(rgw.RGW.all_metrics, rgw_stats)
+
+    if CEPH.osd:
+        osd_node_stats = CEPH.osd.get_stats()
+        write_stats(osd.OSDs.all_metrics, osd_node_stats)
+
+    if CEPH.iscsi:
+        iscsi_stats = CEPH.iscsi.get_stats()
+        write_stats(iscsi.ISCSIGateway.metrics, iscsi_stats)
+
+
+if TEST_MODE:
+    pass
+else:
+    CEPH = Ceph()
+    collectd.register_config(configure_callback)
+    collectd.register_read(read_callback)
diff --git a/cephmetrics/__init__.py b/cephmetrics/__init__.py
deleted file mode 100644 (file)
index 4f230d3..0000000
+++ /dev/null
@@ -1,172 +0,0 @@
-#!/usr/bin/env python
-import sys
-TEST_MODE = (sys.argv[0].split('/')[-1] == 'py.test')
-
-import os
-import glob
-import logging
-
-try:
-    import collectd
-except ImportError:
-    if not TEST_MODE:
-        raise
-
-from collectors import (common, iscsi, mon, osd, rgw)
-
-__author__ = 'Paul Cuzner'
-
-PLUGIN_NAME = 'cephmetrics'
-
-
-class Ceph(object):
-    def __init__(self):
-        self.cluster_name = None
-        self.host_name = common.get_hostname()
-
-        self.mon_socket = None
-        self.rgw_socket = None
-
-        self.mon = None
-        self.rgw = None
-        self.osd = None
-        self.iscsi = None
-
-    def probe(self):
-        """
-        set up which collector(s) to use, based on what types of sockets we
-        find in /var/run/ceph
-        """
-
-        mon_socket = '/var/run/ceph/{}-mon.{}.asok'.format(self.cluster_name,
-                                                           self.host_name)
-        if os.path.exists(mon_socket):
-            self.mon_socket = mon_socket
-            self.mon = mon.Mon(self.cluster_name,
-                           admin_socket=mon_socket)
-
-        rgw_socket_list = glob.glob('/var/run/ceph/{}-client.rgw.*.'
-                                    'asok'.format(self.cluster_name))
-
-        if rgw_socket_list:
-            rgw_socket = rgw_socket_list[0]
-            self.rgw = rgw.RGW(self.cluster_name,
-                           admin_socket=rgw_socket)
-
-        osd_socket_list = glob.glob('/var/run/ceph/{}-osd.*'
-                                    '.asok'.format(self.cluster_name))
-        mounted = common.freadlines('/proc/mounts')
-        osds_mounted = [mnt for mnt in mounted
-                        if mnt.split()[1].startswith('/var/lib/ceph')]
-        if osd_socket_list or osds_mounted:
-            self.osd = osd.OSDs(self.cluster_name)
-
-        if os.path.exists('/sys/kernel/config/target/iscsi'):
-            self.iscsi = iscsi.ISCSIGateway(self.cluster_name)
-
-        collectd.info(
-            "{}: Roles detected - mon:{} osd:{} rgw:{} iscsi:{}".format(
-                __name__,
-                isinstance(self.mon, mon.Mon),
-                isinstance(self.osd, osd.OSDs),
-                isinstance(self.rgw, rgw.RGW),
-                isinstance(self.iscsi, iscsi.ISCSIGateway),
-            ))
-
-
-def write_stats(role_metrics, stats):
-
-    flat_stats = common.flatten_dict(stats, '.')
-    for key_name in flat_stats:
-        attr_name = key_name.split('.')[-1]
-
-        # TODO: this needs some more think time, since the key from the name
-        # is not the key of the all_metrics dict
-        if attr_name in role_metrics:
-            attr_type = role_metrics[attr_name][1]     # gauge / derive etc
-        else:
-            # assign a default
-            attr_type = 'gauge'
-
-        attr_value = flat_stats[key_name]
-
-        val = collectd.Values(plugin=PLUGIN_NAME, type=attr_type)
-        instance_name = "{}.{}".format(CEPH.cluster_name,
-                                       key_name)
-        val.type_instance = instance_name
-        val.values = [attr_value]
-        val.dispatch()
-
-
-def configure_callback(conf):
-
-    valid_log_levels = ['debug', 'info']
-
-    global CEPH
-    module_parms = {node.key: node.values[0] for node in conf.children}
-
-    log_level = module_parms.get('LogLevel', 'debug')
-    if log_level not in valid_log_levels:
-        collectd.error("LogLevel specified is invalid - must"
-                       " be :{}".format(' or '.join(valid_log_levels)))
-
-    if 'ClusterName' in module_parms:
-        cluster_name = module_parms['ClusterName']
-        # cluster name is all we need to get started
-        if not os.path.exists('/etc/ceph/{}.conf'.format(cluster_name)):
-            collectd.error("Clustername given ('{}') not found in "
-                           "/etc/ceph".format(module_parms['ClusterName']))
-
-        # let's assume the conf file is OK to use
-        CEPH.cluster_name = cluster_name
-
-        setup_module_logging(log_level)
-
-        CEPH.probe()
-
-    else:
-        collectd.error("ClusterName is required")
-
-
-def setup_module_logging(log_level, path='/var/log/collectd-cephmetrics.log'):
-
-    level = {"debug": logging.DEBUG,
-             "info": logging.INFO}
-
-    logging.getLogger('cephmetrics')
-    log_conf = dict(
-        format='%(asctime)s - %(levelname)-7s - '
-               '[%(filename)s:%(lineno)s:%(funcName)s() - '
-               '%(message)s',
-        level=level.get(log_level)
-    )
-    if path:
-        log_conf['filename'] = path
-    logging.basicConfig(**log_conf)
-
-
-def read_callback():
-
-    if CEPH.mon:
-        mon_stats = CEPH.mon.get_stats()
-        write_stats(mon.Mon.all_metrics, mon_stats)
-
-    if CEPH.rgw:
-        rgw_stats = CEPH.rgw.get_stats()
-        write_stats(rgw.RGW.all_metrics, rgw_stats)
-
-    if CEPH.osd:
-        osd_node_stats = CEPH.osd.get_stats()
-        write_stats(osd.OSDs.all_metrics, osd_node_stats)
-
-    if CEPH.iscsi:
-        iscsi_stats = CEPH.iscsi.get_stats()
-        write_stats(iscsi.ISCSIGateway.metrics, iscsi_stats)
-
-
-if TEST_MODE:
-    pass
-else:
-    CEPH = Ceph()
-    collectd.register_config(configure_callback)
-    collectd.register_read(read_callback)
diff --git a/cephmetrics/collectors/__init__.py b/cephmetrics/collectors/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/cephmetrics/collectors/base.py b/cephmetrics/collectors/base.py
deleted file mode 100644 (file)
index f2f2295..0000000
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env python
-
-import json
-import time
-import logging
-
-from ceph_daemon import admin_socket
-
-
-class BaseCollector(object):
-
-    def __init__(self, cluster_name, admin_socket=None):
-        self.cluster_name = cluster_name
-        self.admin_socket = admin_socket
-
-        self.logger = logging.getLogger('cephmetrics')
-
-    def _admin_socket(self, cmds=None, socket_path=None):
-
-        adm_socket = self.admin_socket if not socket_path else socket_path
-
-        if not cmds:
-            cmds = ['perf', 'dump']
-
-        start = time.time()
-        response = admin_socket(adm_socket, cmds,
-                                format='json')
-        end = time.time()
-
-        self.logger.debug("admin_socket call '{}' : "
-                          "{:.3f}s".format(' '.join(cmds),
-                                           (end - start)))
-
-        return json.loads(response)
-
-    def get_stats(self):
-
-        return {}
diff --git a/cephmetrics/collectors/common.py b/cephmetrics/collectors/common.py
deleted file mode 100644 (file)
index 8b94dcc..0000000
+++ /dev/null
@@ -1,247 +0,0 @@
-#!/usr/bin/env python
-
-
-import socket
-from os import statvfs
-import math
-
-
-def get_hostname():
-    return socket.gethostname().split('.')[0]
-
-
-def add_dicts(dict1, dict2):
-    """
-    Add dictionary values together
-    :param dict1:
-    :param dict2:
-    :return: dict with matching fields sum'd together
-    """
-    return {key: dict1.get(key, 0) + dict2.get(key, 0)
-            for key in set(dict1).union(dict2)}
-
-
-def merge_dicts(dict1, dict2):
-    """
-    merges two dicts together to form a single dict. when dict keys overlap
-    the value in the 2nd dict takes precedence
-    :param dict1:
-    :param dict2:
-    :return: combined dict
-    """
-
-    new = dict1.copy()
-    new.update(dict2)
-
-    return new
-
-
-def flatten_dict(data, separator='.', prefix=''):
-    """
-    flatten a dict, so it is just simple key/value pairs
-    :param data: (dict)
-    :param separator: (str) char to use when combining keys
-    :param prefix: key prefix
-    :return:
-    """
-    return {prefix + separator + k if prefix else k: v
-            for kk, vv in data.items()
-            for k, v in flatten_dict(vv, separator, kk).items()
-            } if isinstance(data, dict) else {prefix: data}
-
-
-def todict(obj):
-    """
-    convert an object to a dict representation
-    :param obj: (object) object to examine, to extract variables/values from
-    :return: (dict) representation of the given object
-    """
-    data = {}
-    for key, value in obj.__dict__.iteritems():
-
-        if key.startswith('_'):
-            continue
-
-        try:
-            data[key] = todict(value)
-        except AttributeError:
-            data[key] = value
-
-    return data
-
-
-def fread(file_name=None):
-    """
-    Simple read function for files of a single value
-    :param file_name: (str) file name to read
-    :return: (str) contents of the file
-    """
-
-    with open(file_name, 'r') as f:
-        setting = f.read().rstrip()
-    return setting
-
-
-def freadlines(file_name=None):
-    """
-    simple readlines function to return all records of a given file
-    :param file_name: (str) file name to read
-    :return: (list) contents of the file
-    """
-
-    with open(file_name, 'r') as f:
-        data = f.readlines()
-    return data
-
-
-class IOstat(object):
-    raw_metrics = [
-        "_reads",
-        "_reads_mrgd",
-        "_sectors_read",
-        "_read_ms",
-        "_writes",
-        "_writes_mrgd",
-        "_sectors_written",
-        "_write_ms",
-        "_current_io",
-        "_ms_active_io",
-        "_ms_active_io_w"
-    ]
-
-    sector_size = 512
-
-    metrics = {
-        "iops": ("iops", "gauge"),
-        "r_iops": ("r_iops", "gauge"),
-        "w_iops": ("w_iops", "gauge"),
-        "bytes_per_sec": ("bytes_per_sec", "gauge"),
-        "r_bytes_per_sec": ("r_bytes_per_sec", "gauge"),
-        "w_bytes_per_sec": ("w_bytes_per_sec", "gauge"),
-        "util": ("util", "gauge"),
-        "await": ("await", "gauge"),
-        "r_await": ("r_await", "gauge"),
-        "w_await": ("w_await", "gauge"),
-    }
-
-    def __init__(self):
-        self._previous = []
-        self._current = []
-
-        # Seed the metrics we're interested in
-        for ctr in IOstat.metrics.keys():
-            setattr(self, ctr, 0)
-
-    def __str__(self):
-        s = '\n- IOstat object:\n'
-        for key in sorted(vars(self)):
-            s += '\t{} ... {}\n'.format(key, getattr(self, key))
-        return s
-
-    def _calc_raw_delta(self):
-        if not self._previous:
-            # nothing to compute yet
-            for ptr in range(len(IOstat.raw_metrics)):
-                key = IOstat.raw_metrics[ptr]
-                setattr(self, key, 0)
-        else:
-            for ptr in range(len(IOstat.raw_metrics)):
-                key = IOstat.raw_metrics[ptr]
-                setattr(self, key, (int(self._current[ptr]) -
-                                    int(self._previous[ptr])))
-
-    def compute(self, sample_interval):
-        """
-        Calculate the iostats for this device
-        """
-
-        self._calc_raw_delta()
-
-        if sample_interval > 0:
-            interval_ms = sample_interval * 1000
-            total_io = self._reads + self._writes
-            self.util = float(self._ms_active_io) / interval_ms * 100
-            self.iops = int(total_io) / sample_interval
-            self.r_iops = int(self._reads) / sample_interval
-            self.w_iops = int(self._writes) / sample_interval
-            self.await = (float(self._write_ms + self._read_ms) / total_io if
-                          total_io > 0 else 0)
-            self.w_await = float(
-                self._write_ms) / self._writes if self._writes > 0 else 0
-            self.r_await = float(
-                self._read_ms) / self._reads if self._reads > 0 else 0
-            self.r_bytes_per_sec = (float(
-                self._sectors_read * IOstat.sector_size)) / sample_interval
-            self.w_bytes_per_sec = (float(
-                self._sectors_written * IOstat.sector_size)) / sample_interval
-            self.bytes_per_sec = self.r_bytes_per_sec + self.w_bytes_per_sec
-
-
-class Disk(object):
-
-    metrics = {
-        "rotational": ("rotational", "gauge"),
-        "disk_size": ("disk_size", "gauge"),
-        "fs_size": ("fs_size", "gauge"),
-        "fs_used": ("fs_used", "gauge"),
-        "fs_percent_used": ("fs_percent_used", "gauge"),
-        "osd_id": ("osd_id", "gauge")
-    }
-
-    osd_types = {"filestore": 0, "bluestore": 1}
-
-    def __init__(self, device_name, path_name=None, osd_id=None,
-                 in_osd_type="filestore", encrypted=0):
-
-        self._name = device_name
-        self._path_name = path_name
-        self._base_dev = Disk.get_base_dev(device_name)
-        self.osd_id = osd_id
-
-        self.rotational = self._get_rota()
-        self.disk_size = self._get_size()
-        self.perf = IOstat()
-        self.fs_size = 0
-        self.fs_percent_used = 0
-        self.fs_used = 0
-        self.encrypted = encrypted
-        self.osd_type = Disk.osd_types[in_osd_type]
-
-        self.refresh()
-
-    def _get_size(self):
-        return int(fread("/sys/block/{}/size".format(self._base_dev))) * 512
-
-    def _get_rota(self):
-        return int(
-            fread("/sys/block/{}/queue/rotational".format(self._base_dev)))
-
-    def _get_fssize(self):
-        s = statvfs("{}/whoami".format(self._path_name))
-        fs_size = s.f_blocks * s.f_bsize
-        fs_used = fs_size - (s.f_bfree * s.f_bsize)
-        fs_percent_used = math.ceil((float(fs_used) / fs_size) * 100)
-        return fs_size, fs_used, fs_percent_used
-
-    def refresh(self):
-        # only run the fs size update, if the _path_name is set.
-        if self._path_name:
-            self.fs_size, self.fs_used, self.fs_percent_used = \
-                self._get_fssize()
-
-    @staticmethod
-    def get_base_dev(dev_name):
-
-        # for intelcas devices, just use the device name as is
-        if dev_name.startswith('intelcas'):
-            device = dev_name
-        elif dev_name.startswith('nvme'):
-            if 'p' in dev_name:
-                device = dev_name[:(dev_name.index('p'))]
-            else:
-                device = dev_name
-        else:
-            # default strip any numeric ie. sdaa1 -> sdaa
-            device = filter(lambda ch: ch.isalpha(), dev_name)
-
-        return device
diff --git a/cephmetrics/collectors/iscsi.py b/cephmetrics/collectors/iscsi.py
deleted file mode 100644 (file)
index 27dbceb..0000000
+++ /dev/null
@@ -1,255 +0,0 @@
-#!/usr/bin/env python2
-
-# requires python-rtslib_fb for LIO interaction
-#
-# NB. the rtslib_fb module is dynamically loaded by the ISCSIGateway
-# class instantiation. This prevents import errors within the generic parent
-# module cephmetrics
-#
-import os
-import sys
-import time
-
-from cephmetrics.collectors import (base, common)
-
-
-class Client(object):
-
-    def __init__(self, iqn):
-        self.iqn = iqn
-        self.name = iqn.replace('.', '-')
-        self.luns = {}
-        self.lun_count = 0
-        self._cycle = 0
-
-    def dump(self):
-        client_dump = {}
-        lun_info = {}
-        client_dump[self.name] = {"luns": {},
-                                  "lun_count": self.lun_count}
-        for lun_name in self.luns:
-            lun = self.luns[lun_name]
-            lun_info.update(lun.dump())
-
-        return {self.name: {"luns": lun_info,
-                            "lun_count": len(lun_info)}
-                }
-
-
-class LUN(object):
-
-    def __init__(self, client, tpg_lun):
-        self._path = tpg_lun.storage_object.path
-        self._tpg_lun = tpg_lun
-        self._name = tpg_lun.storage_object.name
-        self._display_name = tpg_lun.storage_object.name.replace('.', "-")
-        self._so = tpg_lun.storage_object
-        self._client = client
-        self._cycle = 0
-        self.size = 0
-        self.iops = 0
-        self.read_bytes_per_sec = 0
-        self.write_bytes_per_sec = 0
-        self.total_bytes_per_sec = 0
-        self.active_path = 0
-
-    def refresh(self, cycle_id):
-        self._cycle = cycle_id
-        self.size = self._so.size
-        stats_path = os.path.join(self._path, 'statistics/scsi_lu')
-        self.iops = int(common.fread(os.path.join(stats_path, "num_cmds")))
-        read_mb = float(common.fread(os.path.join(stats_path, "read_mbytes")))
-        write_mb = float(
-            common.fread(os.path.join(stats_path, "write_mbytes")))
-        self.read_bytes_per_sec = int(read_mb * 1024 ** 2)
-        self.write_bytes_per_sec = int(write_mb * 1024 ** 2)
-        self.total_bytes_per_sec = self.read_bytes_per_sec + \
-            self.write_bytes_per_sec
-
-        if self._tpg_lun.alua_tg_pt_gp_name == 'ao':
-            self.active_path = 1
-        else:
-            self.active_path = 0
-
-    def dump(self):
-        return {self._display_name: {k: getattr(self, k) for k in self.__dict__
-                                     if not k.startswith("_")}}
-
-
-class ISCSIGateway(base.BaseCollector):
-    """
-    created on a host that has a /sys/kernel/config/target/iscsi dir
-    i.e. there is an iscsi gateway here!
-    """
-
-    metrics = {
-        "lun_count": ("lun_count", "gauge"),
-        "client_count": ("client_count", "gauge"),
-        "tpg_count": ("tpg_count", "gauge"),
-        "sessions": ("sessions", "gauge"),
-        "capacity": ("capacity", "gauge"),
-        "iops": ("iops", "derive"),
-        "read_bytes_per_sec": ("read_bytes_per_sec", "derive"),
-        "write_bytes_per_sec": ("write_bytes_per_sec", "derive"),
-        "total_bytes_per_sec": ("total_bytes_per_sec", "derive")
-    }
-
-    def __init__(self, *args, **kwargs):
-        base.BaseCollector.__init__(self, *args, **kwargs)
-
-        # Since the module can be imported by a parent class but not
-        # instantiated, the rtslib import is deferred until the first instance
-        # of the the class is created. This keeps the parent module simple
-        # and more importantly generic
-        if 'rtslib_fb.root' not in sys.modules.keys():
-
-            try:
-                import rtslib_fb.root as RTSRoot
-            except ImportError:
-                raise ImportError("rtslib_fb python package is missing")
-
-        self._root = RTSRoot()
-
-        self.clients = {}
-        self.cycle = 0
-
-        self.iops = 0
-        self.read_bytes_per_sec = 0
-        self.write_bytes_per_sec = 0
-        self.total_bytes_per_sec = 0
-
-    def refresh(self):
-        """
-        populate the instance by exploring rtslib
-        """
-
-        self.iops = 0
-        self.read_bytes_per_sec = 0
-        self.write_bytes_per_sec = 0
-        self.total_bytes_per_sec = 0
-
-        if self.cycle == 10:
-            self.cycle = 0
-        else:
-            self.cycle += 1
-
-        for node_acl in self._root.node_acls:
-
-            client_name = node_acl.node_wwn
-
-            if client_name not in self.clients:
-                new_client = Client(client_name)
-                self.clients[client_name] = new_client
-
-            client = self.clients[client_name]
-            client.lun_count = 0
-            client._cycle = self.cycle
-
-            for lun in node_acl.mapped_luns:
-                client.lun_count += 1
-                tpg_lun = lun.tpg_lun
-                lun_name = tpg_lun.storage_object.name
-                if lun_name not in client.luns:
-                    lun = LUN(client, tpg_lun)
-                    client.luns[lun._name] = lun
-                else:
-                    lun = client.luns[lun_name]
-
-                lun.refresh(self.cycle)
-
-                self.iops += lun.iops
-                self.read_bytes_per_sec += lun.read_bytes_per_sec
-                self.write_bytes_per_sec += lun.write_bytes_per_sec
-                self.total_bytes_per_sec = self.read_bytes_per_sec + \
-                    self.write_bytes_per_sec
-
-    def prune(self):
-        """
-        drop child objects held by the instance, that are no longer in the
-        iSCSI config i.e. don't report on old information
-        """
-
-        for client_name in self.clients:
-            client = self.clients[client_name]
-
-            for lun_name in client.luns:
-                lun = client.luns[lun_name]
-                if lun._cycle != self.cycle:
-                    # drop the lun entry
-                    self.logger.debug("pruning LUN '{}'".format(lun_name))
-
-                    del client.luns[lun_name]
-
-            if client._cycle != self.cycle:
-                # drop the client entry
-                self.logger.debug("pruning client '{}'".format(client_name))
-                del self.clients[client_name]
-
-    def dump(self):
-
-        gw_stats = {}
-        client_stats = {}
-
-        for metric in ISCSIGateway.metrics:
-            gw_stats[metric] = getattr(self, metric)
-
-        for client_name in self.clients:
-            client = self.clients[client_name]
-            client_stats.update(client.dump())
-
-        return {"iscsi": {
-            "gw_name": {self.gateway_name: 0},
-            "gw_stats": gw_stats,
-            "gw_clients": client_stats
-            }
-        }
-
-    def _get_so(self):
-        return [so for so in self._root.storage_objects]
-
-    def _get_node_acls(self):
-        return [node for node in self._root.node_acls]
-
-    @property
-    def tpg_count(self):
-        return len([tpg for tpg in self._root.tpgs])
-
-    @property
-    def lun_count(self):
-        return len(self._get_so())
-
-    @property
-    def sessions(self):
-        return len([session for session in self._root.sessions])
-
-    @property
-    def gateway_name(self):
-        # Only the 1st gateway is considered/supported
-        gw_iqn = [gw.wwn for gw in self._root.targets][0]
-        return gw_iqn.replace('.', '-')
-
-    @property
-    def client_count(self):
-        return len(self._get_node_acls())
-
-    @property
-    def capacity(self):
-        return sum([so.size for so in self._get_so()])
-
-    def get_stats(self):
-
-        start = time.time()
-
-        # populate gateway instance with the latest configuration from rtslib
-        self.refresh()
-
-        # Overtime they'll be churn in client and disks so we need to drop
-        # any entries from prior runs that are no longer seen in the iscsi
-        # configuration with the prune method
-        self.prune()
-
-        end = time.time()
-
-        self.logger.info("LIO stats took {}s".format(end - start))
-
-        return self.dump()
diff --git a/cephmetrics/collectors/mon.py b/cephmetrics/collectors/mon.py
deleted file mode 100644 (file)
index 411af88..0000000
+++ /dev/null
@@ -1,431 +0,0 @@
-#!/usr/bin/env python
-
-import rados
-import rbd
-import json
-import threading
-import time
-import logging
-
-from cephmetrics.collectors import (base, common)
-
-
-class RBDScanner(threading.Thread):
-
-    def __init__(self, cluster_name, pool_name):
-        self.cluster_name = cluster_name
-        self.pool_name = pool_name
-        self.num_rbds = 0
-        self.logger = logging.getLogger('cephmetrics')
-
-        threading.Thread.__init__(self)
-
-    def run(self):
-        rbd_images = []
-        conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
-        self.logger.debug("scan of '{}' starting".format(self.pool_name))
-        with rados.Rados(conffile=conf_file) as cluster:
-            with cluster.open_ioctx(self.pool_name) as ioctx:
-                rbd_inst = rbd.RBD()
-                self.logger.debug("listing rbd's in {}".format(self.pool_name))
-                rbd_images = rbd_inst.list(ioctx)
-
-        self.logger.info("pool scan complete for '{}'".format(self.pool_name))
-        self.num_rbds = len(rbd_images)
-
-
-class Mon(base.BaseCollector):
-
-    health = {
-        "HEALTH_OK": 0,
-        "HEALTH_WARN": 4,
-        "HEALTH_ERR": 8
-    }
-
-    osd_state = {
-        "up": 0,
-        "down": 1
-    }
-
-    # metrics are declared, where each element has a description and collectd
-    # data type. The description is used to ensure the names sent by collectd
-    # remain the same even if the source name changes in ceph.
-    cluster_metrics = {
-        "num_mon": ("num_mon", "gauge"),
-        "num_mon_quorum": ("num_mon_quorum", "gauge"),
-        "num_rbds": ("num_rbds", "gauge"),
-        "num_osd_hosts": ("num_osd_hosts", "gauge"),
-        "num_osd": ("num_osd", "gauge"),
-        "num_osd_up": ("num_osd_up", "gauge"),
-        "num_osd_in": ("num_osd_in", "gauge"),
-        "osd_epoch": ("osd_epoch", "gauge"),
-        "osd_bytes": ("osd_bytes", "gauge"),
-        "osd_bytes_used": ("osd_bytes_used", "gauge"),
-        "osd_bytes_avail": ("osd_bytes_avail", "gauge"),
-        "num_pool": ("num_pool", "gauge"),
-        "num_pg": ("num_pg", "gauge"),
-        "num_pg_active_clean": ("num_pg_active_clean", "gauge"),
-        "num_pg_active": ("num_pg_active", "gauge"),
-        "num_pg_peering": ("num_pg_peering", "gauge"),
-        "num_object": ("num_object", "gauge"),
-        "num_object_degraded": ("num_object_degraded", "gauge"),
-        "num_object_misplaced": ("num_object_misplaced", "gauge"),
-        "num_object_unfound": ("num_object_unfound", "gauge"),
-        "num_bytes": ("num_bytes", "gauge"),
-        "num_mds_up": ("num_mds_up", "gauge"),
-        "num_mds_in": ("num_mds_in", "gauge"),
-        "num_mds_failed": ("num_mds_failed", "gauge"),
-        "mds_epoch": ("mds_epoch", "gauge"),
-        "health": ("health", "gauge")
-    }
-
-    pool_client_metrics = {
-        'bytes_sec': ("bytes_sec", "gauge"),
-        'op_per_sec': ("op_per_sec", "gauge"),
-        'read_bytes_sec': ("read_bytes_sec", "gauge"),
-        'write_op_per_sec': ("write_op_per_sec", "gauge"),
-        'write_bytes_sec': ("write_bytes_sec", "gauge"),
-        'read_op_per_sec': ("read_op_per_sec", "gauge")
-    }
-
-    pool_recovery_metrics = {
-        "recovering_objects_per_sec": ("recovering_objects_per_sec", "gauge"),
-        "recovering_bytes_per_sec": ("recovering_bytes_per_sec", "gauge"),
-        "recovering_keys_per_sec": ("recovering_keys_per_sec", "gauge"),
-        "num_objects_recovered": ("num_objects_recovered", "gauge"),
-        "num_bytes_recovered": ("num_bytes_recovered", "gauge"),
-        "num_keys_recovered": ("num_keys_recovered", "gauge")
-    }
-
-    osd_metrics = {
-        "status": ("status", "gauge")
-    }
-
-    mon_states = {
-        "mon_status": ("mon_status", "gauge")
-    }
-
-    all_metrics = common.merge_dicts(
-        pool_recovery_metrics, pool_client_metrics)
-    all_metrics = common.merge_dicts(all_metrics, cluster_metrics)
-    all_metrics = common.merge_dicts(all_metrics, osd_metrics)
-    all_metrics = common.merge_dicts(all_metrics, mon_states)
-
-    def __init__(self, *args, **kwargs):
-        base.BaseCollector.__init__(self, *args, **kwargs)
-        self.version = self._get_version()
-        if self.version < 12:
-            self.get_mon_health = self._mon_health
-        else:
-            self.get_mon_health = self._mon_health_new
-
-    def _get_version(self):
-        vers_info = self._mon_command('version')
-        return int(vers_info['version'].replace('.', ' ').split()[2])
-
-    def _mon_command(self, cmd_request):
-        """ Issue a command to the monitor """
-
-        buf_s = '{}'
-        conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
-
-        start = time.time()
-        with rados.Rados(conffile=conf_file) as cluster:
-            cmd = {'prefix': cmd_request, 'format': 'json'}
-            rc, buf_s, out = cluster.mon_command(json.dumps(cmd), b'')
-        end = time.time()
-
-        self.logger.debug("_mon_command call '{}' :"
-                          " {:.3f}s".format(cmd_request,
-                                        (end - start)))
-
-        return json.loads(buf_s)
-
-    @staticmethod
-    def get_feature_state(summary_data, pg_states):
-        """
-        Look at the summary list to determine the state of RADOS features
-        :param summary_data: (list) summary data from a ceph health command
-        :return: (dict) dict indexed by feature
-                        0 Inactive, 1 Active, 2 Disabled
-        """
-        feature_lookup = {"noscrub": "scrub",
-                          "nodeep-scrub": "deep_scrub",
-                          "norecover": "recovery",
-                          "nobackfill": "backfill",
-                          "norebalance": "rebalance",
-                          "noout": "out",
-                          "nodown": "down"}
-
-        # Start with all features inactive i.e. enabled
-        feature_state = {feature_lookup.get(key): 0 for key in feature_lookup}
-
-        for summary in summary_data:
-            summary_desc = summary.get('summary')
-            if "flag(s) set" in summary_desc:
-                flags = summary_desc.replace(' flag(s) set', '').split(',')
-                for disabled_feature in flags:
-                    if disabled_feature in feature_lookup:
-                        feature = feature_lookup.get(disabled_feature)
-                        feature_state[feature] = 2      # feature disabled
-
-        # Now use the current pg state names to determine whether a feature is
-        # active - if not it stays set to '0', which means inactive
-        pg_state_names = [pg_state.get('name') for pg_state in pg_states]
-        for pg_state in pg_state_names:
-            states = pg_state.split('+')
-            if 'recovering' in states:
-                feature_state['recovery'] = 1  # Active
-                continue
-            if 'backfilling' in states:
-                feature_state['backfill'] = 1
-                continue
-            if 'deep' in states:
-                feature_state['deep_scrub'] = 1
-                continue
-            if 'scrubbing' in states:
-                feature_state['scrub'] = 1
-
-        return feature_state
-
-    @classmethod
-    def check_stuck_pgs(cls, summary_list):
-        bad_pg_words = ['pgs', 'stuck', 'inactive']
-        stuck_pgs = 0
-        for summary_data in summary_list:
-            if summary_data.get('severity') != 'HEALTH_ERR':
-                continue
-            if all(trigger in summary_data.get('summary')
-                   for trigger in bad_pg_words):
-                stuck_pgs = int(summary_data.get('summary').split()[0])
-
-        return stuck_pgs
-
-    def _mon_health_new(self):
-
-        cluster, health_data = self._mon_health_common()
-
-        mon_status_output = self._mon_command('mon_status')
-        quorum_list = mon_status_output.get('quorum')
-        mon_list = mon_status_output.get('monmap').get('mons')
-        mon_status = {}
-        for mon in mon_list:
-            state = 0 if mon.get('rank') in quorum_list else 4
-            mon_status[mon.get('name')] = state
-
-        cluster['mon_status'] = mon_status
-
-        return cluster
-
-    def _mon_health_common(self):
-
-        # for v12 (Luminous and beyond) add the following setting to
-        # ceph.conf "mon_health_preluminous_compat=true"
-        # this will provide the same output as pre-luminous
-
-        cluster_data = self._admin_socket().get('cluster')
-        pg_data = self._mon_command("pg stat")
-        health_data = self._mon_command("health")
-        health_text = health_data.get('overall_status',
-                                      health_data.get('status', ''))
-
-        cluster = {Mon.cluster_metrics[k][0]: cluster_data[k]
-                   for k in cluster_data}
-
-        health_num = Mon.health.get(health_text, 16)
-
-        cluster['health'] = health_num
-
-        pg_states = pg_data.get('num_pg_by_state')  # list of dict name,num
-        health_summary = health_data.get('summary', [])  # list of issues
-        cluster['num_pgs_stuck'] = Mon.check_stuck_pgs(health_summary)
-        cluster['features'] = Mon.get_feature_state(health_summary,
-                                                    pg_states)
-
-        self.logger.debug(
-            'Features:{}'.format(json.dumps(cluster['features'])))
-
-        return cluster, health_data
-
-    def _mon_health(self):
-
-        cluster, health_data = self._mon_health_common()
-
-        services = health_data.get('health').get('health_services')
-        monstats = {}
-        for svc in services:
-            if 'mons' in svc:
-                # Each monitor will have a numeric value denoting health
-                monstats = { mon.get('name'): Mon.health.get(mon.get('health'))
-                             for mon in svc.get('mons')}
-
-        cluster['mon_status'] = monstats
-
-        return cluster
-
-    @classmethod
-    def _seed(cls, metrics):
-        return {metrics[key][0]: 0 for key in metrics}
-
-    def display_names(self, metric_format, metrics):
-        """
-        convert the keys to the static descriptions
-        :return:
-        """
-        return {metric_format[k][0]: metrics[k]
-                for k in metrics} if metrics else {}
-
-    def _get_pool_stats(self):
-        """ get pool stats from rados """
-
-        raw_stats = self._mon_command('osd pool stats')
-        pool_stats = {}
-
-        # process each pool
-        for pool in raw_stats:
-
-            pool_name = pool['pool_name'].replace('.', '_')
-            client_io = self.display_names(Mon.pool_client_metrics,
-                                           pool.get('client_io_rate'))
-            recovery = self.display_names(Mon.pool_recovery_metrics,
-                                          pool.get('recovery_rate'))
-
-            pool_md = {}
-            if client_io:
-
-                # Add pool level aggregation
-                client_io['bytes_sec'] = client_io.get('read_bytes_sec', 0) + \
-                    client_io.get('write_bytes_sec', 0)
-                client_io["op_per_sec"] = client_io.get('read_op_per_sec', 0)+ \
-                    client_io.get('write_op_per_sec', 0)
-                pool_md = client_io
-
-            else:
-                pool_md = Mon._seed(Mon.pool_client_metrics)
-
-            if recovery:
-                pool_md = common.merge_dicts(pool_md, recovery)
-            else:
-                pool_md = common.merge_dicts(pool_md, Mon._seed(
-                    Mon.pool_recovery_metrics))
-
-            pool_stats[pool_name] = pool_md
-
-        return pool_stats
-
-    def _get_osd_states(self):
-
-        self.logger.debug("fetching osd states from the local mon")
-        raw = self._mon_command('osd dump')
-        osd_hosts = set()
-        osds = {}
-        for osd in raw.get('osds'):
-            cluster_addr = osd.get('cluster_addr').split(':')[0]
-            osd_hosts.add(cluster_addr)
-
-            # NB. The key for the osds dict must be a string as the dict is
-            # flattened when the metric name is derived in the parent collectd
-            # module. If it is not converted, you get a TypeError
-            osds[str(osd.get('osd'))] = {"up": osd.get('up'),
-                                         "in": osd.get('in')}
-
-        return len(osd_hosts), osds
-
-    @staticmethod
-    def _select_pools(pools, mons):
-        """
-        determine the pools this mon should scan based on it's name. We select
-        pools from the an offset into the pool list, and then repeat at an
-        interval set by # mons in the configuration. This splits up the pools
-        we have, so each mon looks at a discrete set of pools instead of all
-        mons performing all scans.
-        :param pools: (list) rados pool names
-        :param mons: (list) monitor names from ceph health
-        :return: (list) of pools this monitor should scan. empty list if the
-                 monitor name mismatches - so no scans done
-        """
-
-        pools_to_scan = []
-
-        try:
-            freq = mons.index(common.get_hostname())
-        except ValueError:
-            # this host's name is not in the monitor list?
-            # twilight zone moment
-            pass
-        else:
-
-            pools_to_scan = [pools[ptr]
-                             for ptr in xrange(freq, len(pools), len(mons))]
-
-        return pools_to_scan
-
-    def get_pools(self):
-        skip_pools = ('default.rgw')
-
-        start = time.time()
-        conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
-        with rados.Rados(conffile=conf_file) as cluster:
-            rados_pools = sorted(cluster.list_pools())
-        end = time.time()
-
-        self.logger.debug('lspools took {:.3f}s'.format(end - start))
-
-        filtered_pools = [pool for pool in rados_pools
-                          if not pool.startswith(skip_pools)]
-
-        return filtered_pools
-
-    def _get_rbds(self, monitors):
-
-        pool_list = self.get_pools()
-        mon_list = sorted(monitors.keys())
-        my_pools = Mon._select_pools(pool_list, mon_list)
-        self.logger.debug("Pools to be scanned on this mon"
-                          " : {}".format(','.join(my_pools)))
-        threads = []
-
-        start = time.time()
-
-        for pool in my_pools:
-            thread = RBDScanner(self.cluster_name, pool)
-            thread.start()
-            threads.append(thread)
-
-        # wait for all threads to complete
-        for thread in threads:
-            thread.join(1)
-
-        end = time.time()
-        self.logger.debug("rbd scans {:.3f}s".format((end - start)))
-
-        total_rbds = sum([thread.num_rbds for thread in threads])
-        self.logger.debug("total rbds found : {}".format(total_rbds))
-
-        for thread in threads:
-            del thread
-
-        return total_rbds
-
-    def get_stats(self):
-        """
-        method associated with the plugin callback to gather the metrics
-        :return: (dict) metadata describing the state of the mon/osd's
-        """
-
-        start = time.time()
-
-        pool_stats = self._get_pool_stats()
-        num_osd_hosts, osd_states = self._get_osd_states()
-        cluster_state = self.get_mon_health()
-        cluster_state['num_osd_hosts'] = num_osd_hosts
-        cluster_state['num_rbds'] = self._get_rbds(cluster_state['mon_status'])
-
-        all_stats = common.merge_dicts(cluster_state, {"pools": pool_stats,
-                                                "osd_state": osd_states})
-
-        end = time.time()
-        self.logger.info("mon get_stats call : {:.3f}s".format((end - start)))
-
-        return {"mon": all_stats}
-
diff --git a/cephmetrics/collectors/osd.py b/cephmetrics/collectors/osd.py
deleted file mode 100644 (file)
index c95727b..0000000
+++ /dev/null
@@ -1,332 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import time
-import math
-
-from cephmetrics.collectors import (base, common)
-
-__author__ = "Paul Cuzner"
-
-
-class OSDstats(object):
-
-    osd_capacity = {
-        "stat_bytes": ("stat_bytes", "gauge"),
-        "stat_bytes_used": ("stat_bytes_used", "gauge"),
-        "stat_bytes_avail": ("stat_bytes_avail", "gauge")
-    }
-
-    filestore_metrics = {
-        "journal_latency",
-        "commitcycle_latency",
-        "apply_latency",
-        "queue_transaction_latency_avg"
-    }
-
-    def __init__(self, osd_type='filestore'):
-        self._current = {}
-        self._previous = {}
-        self._osd_type = osd_type
-        self.osd_percent_used = 0
-
-    def update(self, stats):
-        """
-        update the objects attributes based on the dict
-        :param stats: (dict) containing filestore performance ('filestore')
-               and capacity info ('osd')
-        :return: None
-        """
-
-        if self._current:
-            self._previous = self._current
-            self._current = stats['filestore']
-        else:
-            self._current = stats['filestore']
-
-        for attr in OSDstats.filestore_metrics:
-
-            if self._previous:
-                d_sum = self._current[attr].get('sum') - \
-                    self._previous[attr].get('sum')
-                d_avgcount = self._current[attr].get('avgcount') - \
-                    self._previous[attr].get('avgcount')
-
-                if d_sum == 0 or d_avgcount == 0:
-                    val = 0
-                else:
-                    val = float(d_sum) / d_avgcount
-            else:
-                # no previous value, so set to 0
-                val = 0
-
-            setattr(self, attr, val)
-
-        for attr in stats['osd']:
-            setattr(self, attr, stats['osd'].get(attr))
-
-        self.osd_percent_used = math.ceil((float(self.stat_bytes_used) /
-                                           self.stat_bytes) * 100)
-
-
-class OSDs(base.BaseCollector):
-
-    all_metrics = common.merge_dicts(
-        common.Disk.metrics, common.IOstat.metrics)
-
-    def __init__(self, cluster_name, **kwargs):
-        base.BaseCollector.__init__(self, cluster_name, **kwargs)
-        self.timestamp = int(time.time())
-
-        self.osd = {}          # dict of disk objects, each disk contains osd_id
-        self.jrnl = {}      # dict of journal devices (if not collocated)
-        self.osd_id_list = []
-        self.dev_lookup = {}    # dict dev_name -> osd | jrnl
-        self.osd_count = 0
-
-    def __repr__(self):
-
-        s = ''
-        for disk in self.osd:
-            s += "{}\n".format(disk)
-            dev = self.osd[disk]
-
-            for var in vars(dev):
-                if not var.startswith('_'):
-                    s += "{} ... {}\n".format(var, getattr(dev, var))
-        return s
-
-    def _fetch_osd_stats(self, osd_id):
-
-        # NB: osd stats are cumulative
-
-        stats = {}
-        osd_socket_name = '/var/run/ceph/{}-osd.{}.asok'.format(
-            self.cluster_name, osd_id)
-
-        if not os.path.exists(osd_socket_name):
-            # all OSD's should expose an admin socket, so if it's missing
-            # this node has a problem!
-            raise IOError("Socket file missing for OSD {}".format(osd_id))
-
-        self.logger.debug("fetching osd stats for osd {}".format(osd_id))
-        resp = self._admin_socket(socket_path=osd_socket_name)
-
-        filestore_stats = resp.get('filestore')
-        stats['filestore'] = {key_name: filestore_stats.get(key_name)
-                              for key_name in OSDstats.filestore_metrics}
-
-        osd_stats = resp.get('osd')
-
-        # Add disk usage stats
-        stats['osd'] = {key_name: osd_stats.get(key_name)
-                        for key_name in OSDstats.osd_capacity.keys()}
-
-        return stats
-
-    @staticmethod
-    def get_osd_type(osd_path):
-
-        osd_type_fname = os.path.join(osd_path, 'type')
-        if os.path.exists(osd_type_fname):
-            return common.fread(osd_type_fname)
-        else:
-            if os.path.exists(os.path.join(osd_path, 'journal')):
-                return "filestore"
-            else:
-                raise ValueError("Unrecognised OSD type")
-
-    def _dev_to_osd(self):
-        """
-        Look at the system to determine which disks are acting as OSD's
-        """
-
-        # the logic here uses the mount points to determine which OSD's are
-        # in the system. The encryption state is determine just by the use
-        # devicemapper (i.e. /dev/mapper prefixed devices) - since at this time
-        # this is all dm is used for.
-
-        osd_indicators = {'var', 'lib', 'osd'}
-
-        for mnt in common.freadlines('/proc/mounts'):
-            items = mnt.split(' ')
-            dev_path, path_name = items[:2]
-            if path_name.startswith('/var/lib'):
-                # take a close look since this is where ceph osds usually
-                # get mounted
-                dirs = set(path_name.split('/'))
-                if dirs.issuperset(osd_indicators):
-
-                    # get the osd_id from the name is the most simple way
-                    # to get the id, due to naming conventions. If this fails
-                    # though, plan 'b' is the whoami file
-                    osd_id = path_name.split('-')[-1]
-                    if not osd_id.isdigit():
-                        osd_id = common.fread(
-                            os.path.join(path_name, 'whoami'))
-
-                    if osd_id not in self.osd:
-                        osd_type = OSDs.get_osd_type(path_name)
-                        self.osd[osd_id] = OSDstats(osd_type=osd_type)
-                        self.osd_id_list.append(osd_id)
-
-                        osd_type = self.osd[osd_id]._osd_type
-                        if osd_type == 'filestore':
-                            if dev_path.startswith('/dev/mapper'):
-                                encrypted = 1
-                                uuid = dev_path.split('/')[-1]
-                                partuuid = '/dev/disk/by-partuuid/{}'.format(
-                                    uuid)
-                                dev_path = os.path.realpath(partuuid)
-                                osd_device = dev_path.split('/')[-1]
-                            else:
-                                encrypted = 0
-                                osd_device = dev_path.split('/')[-1]
-
-                        elif osd_type == 'bluestore':
-                            block_link = os.path.join(path_name, 'block')
-                            osd_path = os.path.realpath(block_link)
-                            osd_device = osd_path.split('/')[-1]
-                            encrypted = 0
-                        else:
-                            raise ValueError("Unknown OSD type encountered")
-
-                        # if the osd_id hasn't been seem neither has the
-                        # disk
-                        self.osd[osd_device] = common.Disk(
-                            osd_device,
-                            path_name=path_name,
-                            osd_id=osd_id,
-                            in_osd_type=osd_type,
-                            encrypted=encrypted)
-                        self.dev_lookup[osd_device] = 'osd'
-                        self.osd_count += 1
-
-                        if osd_type == 'filestore':
-                            journal_link = os.path.join(path_name, 'journal')
-                        else:
-                            journal_link = os.path.join(path_name, 'block.wal')
-
-                        if os.path.exists(journal_link):
-                            link_tgt = os.readlink(journal_link)
-                            if link_tgt.startswith('/dev/mapper'):
-                                encrypted = 1
-                            else:
-                                encrypted = 0
-
-                            partuuid_path = os.path.join(
-                                '/dev/disk/by-partuuid',
-                                link_tgt.split('/')[-1])
-                            jrnl_path = os.path.realpath(partuuid_path)
-                            jrnl_dev = jrnl_path.split('/')[-1]
-
-                            if jrnl_dev not in self.osd:
-                                self.jrnl[jrnl_dev] = common.Disk(
-                                    jrnl_dev,
-                                    osd_id=osd_id,
-                                    in_osd_type=osd_type,
-                                    encrypted=encrypted)
-
-                                self.dev_lookup[jrnl_dev] = 'jrnl'
-
-                        else:
-                            # No journal or WAL link..?
-                            pass
-
-    def _stats_lookup(self):
-        """
-        Grab the disk stats from /proc/diskstats, and the key osd perf dump
-        counters
-        """
-
-        now = time.time()
-        interval = int(now) - self.timestamp
-        self.timestamp = int(now)
-
-        # Fetch diskstats from the OS
-        for perf_entry in common.freadlines('/proc/diskstats'):
-
-            field = perf_entry.split()
-            dev_name = field[2]
-
-            device = None
-            if self.dev_lookup.get(dev_name, None) == 'osd':
-                device = self.osd[dev_name]
-            elif self.dev_lookup.get(dev_name, None) == 'jrnl':
-                device = self.jrnl[dev_name]
-
-            if device:
-                new_stats = field[3:]
-
-                if device.perf._current:
-                    device.perf._previous = device.perf._current
-                    device.perf._current = new_stats
-                else:
-                    device.perf._current = new_stats
-
-                device.perf.compute(interval)
-                device.refresh()
-
-        end = time.time()
-        self.logger.debug("OS disk stats calculated in "
-                          "{:.4f}s".format(end-now))
-
-        # fetch stats from each osd daemon
-        osd_stats_start = time.time()
-        for osd_id in self.osd_id_list:
-
-            if self.osd[osd_id]._osd_type == 'filestore':
-                osd_stats = self._fetch_osd_stats(osd_id)
-
-                # self.logger.debug('stats : {}'.format(osd_stats))
-
-                osd_device = self.osd[osd_id]
-                osd_device.update(osd_stats)
-            else:
-                self.logger.debug("skipped 'bluestore' osd perf collection "
-                                  "for osd.{}".format(osd_id))
-
-        osd_stats_end = time.time()
-        self.logger.debug(
-            "OSD perf dump stats collected for {} OSDs in {:.3f}s".format(
-                len(self.osd_id_list), (osd_stats_end - osd_stats_start)))
-
-    @staticmethod
-    def _dump_devs(device_dict):
-
-        dumped = {}
-
-        for dev_name in sorted(device_dict):
-            device = device_dict[dev_name]
-            dumped[dev_name] = common.todict(device)
-
-        return dumped
-
-    def dump(self):
-        """
-        dump the osd object(s) to a dict. The object *must* not have references
-        to other objects - if this rule is broken cephmetrics caller will fail
-        when parsing the dict
-
-        :return: (dict) dictionary representation of this OSDs on this host
-        """
-
-        return {
-            "num_osds": self.osd_count,
-            "osd": OSDs._dump_devs(self.osd),
-            "jrnl": OSDs._dump_devs(self.jrnl)
-        }
-
-    def get_stats(self):
-
-        start = time.time()
-
-        self._dev_to_osd()
-        self._stats_lookup()
-
-        end = time.time()
-
-        self.logger.info("osd get_stats call "
-                         ": {:.3f}s".format((end - start)))
-
-        return self.dump()
diff --git a/cephmetrics/collectors/rgw.py b/cephmetrics/collectors/rgw.py
deleted file mode 100644 (file)
index 5dbdab3..0000000
+++ /dev/null
@@ -1,73 +0,0 @@
-#!/usr/bin/env python
-
-import time
-
-from cephmetrics.collectors import (base, common)
-
-__author__ = "paul.cuzner@redhat.com"
-
-
-class RGW(base.BaseCollector):
-
-    simple_metrics = {
-        "req": ("requests", "derive"),
-        "failed_req": ("requests_failed", "derive"),
-        "get": ("gets", "derive"),
-        "get_b": ("get_bytes", "derive"),
-        "put": ("puts", "derive"),
-        "put_b": ("put_bytes", "derive"),
-        "qlen": ("qlen", "derive"),
-        "qactive": ("requests_active", "derive")
-    }
-
-    int_latencies = [
-        "get_initial_lat",
-        "put_initial_lat"
-    ]
-
-    latencies = {
-        "get_initial_lat_sum": ("get_initial_lat_sum", "derive"),
-        "get_initial_lat_avgcount": ("get_initial_lat_avgcount", "derive"),
-        "put_initial_lat_sum": ("put_initial_lat_sum", "derive"),
-        "put_initial_lat_avgcount": ("put_initial_lat_avgcount", "derive")
-    }
-
-    all_metrics = common.merge_dicts(simple_metrics, latencies)
-
-    def __init__(self, cluster_name, admin_socket, **kwargs):
-        base.BaseCollector.__init__(self, cluster_name, admin_socket, **kwargs)
-        self.host_name = common.get_hostname()
-
-    def _get_rgw_data(self):
-
-        response = self._admin_socket()
-
-        key_name = 'client.rgw.{}'.format(self.host_name)
-
-        return response.get(key_name)
-
-    def _filter(self, stats):
-        # pick out the simple metrics
-
-        filtered = {key: stats[key] for key in RGW.simple_metrics}
-
-        for key in RGW.int_latencies:
-            for _attr in stats[key]:
-                new_key = "{}_{}".format(key, _attr)
-                filtered[new_key] = stats[key].get(_attr)
-
-        return filtered
-
-    def get_stats(self):
-
-        start = time.time()
-
-        raw_stats = self._get_rgw_data()
-
-        stats = self._filter(raw_stats)
-
-        end = time.time()
-
-        self.logger.info("RGW get_stats : {:.3f}s".format((end - start)))
-
-        return {"rgw": stats}
diff --git a/cephmetrics_collectors/__init__.py b/cephmetrics_collectors/__init__.py
new file mode 100644 (file)
index 0000000..a5abc98
--- /dev/null
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+import glob
+import os
+import sys
+
+# Remove collectd's plugin dir from sys.path to avoid namespace collisions with
+# .so files that are not actually Python modules
+sys.path = [item for item in sys.path if not item.endswith('/collectd')]
+
+from . import (common, iscsi, mon, osd, rgw)
+
+
+class Ceph(object):
+    def __init__(self):
+        self.cluster_name = None
+        self.host_name = common.get_hostname()
+
+        self.mon_socket = None
+        self.rgw_socket = None
+
+        self.mon = None
+        self.rgw = None
+        self.osd = None
+        self.iscsi = None
+
+    def probe(self):
+        """
+        set up which collector(s) to use, based on what types of sockets we
+        find in /var/run/ceph
+        """
+
+        mon_socket = '/var/run/ceph/{}-mon.{}.asok'.format(self.cluster_name,
+                                                           self.host_name)
+        if os.path.exists(mon_socket):
+            self.mon_socket = mon_socket
+            self.mon = mon.Mon(self.cluster_name, admin_socket=mon_socket)
+
+        rgw_socket_list = glob.glob('/var/run/ceph/{}-client.rgw.*.'
+                                    'asok'.format(self.cluster_name))
+
+        if rgw_socket_list:
+            rgw_socket = rgw_socket_list[0]
+            self.rgw = rgw.RGW(self.cluster_name, admin_socket=rgw_socket)
+
+        osd_socket_list = glob.glob('/var/run/ceph/{}-osd.*'
+                                    '.asok'.format(self.cluster_name))
+        mounted = common.freadlines('/proc/mounts')
+        osds_mounted = [mnt for mnt in mounted
+                        if mnt.split()[1].startswith('/var/lib/ceph')]
+        if osd_socket_list or osds_mounted:
+            self.osd = osd.OSDs(self.cluster_name)
+
+        if os.path.exists('/sys/kernel/config/target/iscsi'):
+            self.iscsi = iscsi.ISCSIGateway(self.cluster_name)
diff --git a/cephmetrics_collectors/base.py b/cephmetrics_collectors/base.py
new file mode 100644 (file)
index 0000000..f2f2295
--- /dev/null
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+
+import json
+import time
+import logging
+
+from ceph_daemon import admin_socket
+
+
+class BaseCollector(object):
+
+    def __init__(self, cluster_name, admin_socket=None):
+        self.cluster_name = cluster_name
+        self.admin_socket = admin_socket
+
+        self.logger = logging.getLogger('cephmetrics')
+
+    def _admin_socket(self, cmds=None, socket_path=None):
+
+        adm_socket = self.admin_socket if not socket_path else socket_path
+
+        if not cmds:
+            cmds = ['perf', 'dump']
+
+        start = time.time()
+        response = admin_socket(adm_socket, cmds,
+                                format='json')
+        end = time.time()
+
+        self.logger.debug("admin_socket call '{}' : "
+                          "{:.3f}s".format(' '.join(cmds),
+                                           (end - start)))
+
+        return json.loads(response)
+
+    def get_stats(self):
+
+        return {}
diff --git a/cephmetrics_collectors/common.py b/cephmetrics_collectors/common.py
new file mode 100644 (file)
index 0000000..8b94dcc
--- /dev/null
@@ -0,0 +1,247 @@
+#!/usr/bin/env python
+
+
+import socket
+from os import statvfs
+import math
+
+
+def get_hostname():
+    return socket.gethostname().split('.')[0]
+
+
+def add_dicts(dict1, dict2):
+    """
+    Add dictionary values together
+    :param dict1:
+    :param dict2:
+    :return: dict with matching fields sum'd together
+    """
+    return {key: dict1.get(key, 0) + dict2.get(key, 0)
+            for key in set(dict1).union(dict2)}
+
+
+def merge_dicts(dict1, dict2):
+    """
+    merges two dicts together to form a single dict. when dict keys overlap
+    the value in the 2nd dict takes precedence
+    :param dict1:
+    :param dict2:
+    :return: combined dict
+    """
+
+    new = dict1.copy()
+    new.update(dict2)
+
+    return new
+
+
+def flatten_dict(data, separator='.', prefix=''):
+    """
+    flatten a dict, so it is just simple key/value pairs
+    :param data: (dict)
+    :param separator: (str) char to use when combining keys
+    :param prefix: key prefix
+    :return:
+    """
+    return {prefix + separator + k if prefix else k: v
+            for kk, vv in data.items()
+            for k, v in flatten_dict(vv, separator, kk).items()
+            } if isinstance(data, dict) else {prefix: data}
+
+
+def todict(obj):
+    """
+    convert an object to a dict representation
+    :param obj: (object) object to examine, to extract variables/values from
+    :return: (dict) representation of the given object
+    """
+    data = {}
+    for key, value in obj.__dict__.iteritems():
+
+        if key.startswith('_'):
+            continue
+
+        try:
+            data[key] = todict(value)
+        except AttributeError:
+            data[key] = value
+
+    return data
+
+
+def fread(file_name=None):
+    """
+    Simple read function for files of a single value
+    :param file_name: (str) file name to read
+    :return: (str) contents of the file
+    """
+
+    with open(file_name, 'r') as f:
+        setting = f.read().rstrip()
+    return setting
+
+
+def freadlines(file_name=None):
+    """
+    simple readlines function to return all records of a given file
+    :param file_name: (str) file name to read
+    :return: (list) contents of the file
+    """
+
+    with open(file_name, 'r') as f:
+        data = f.readlines()
+    return data
+
+
+class IOstat(object):
+    raw_metrics = [
+        "_reads",
+        "_reads_mrgd",
+        "_sectors_read",
+        "_read_ms",
+        "_writes",
+        "_writes_mrgd",
+        "_sectors_written",
+        "_write_ms",
+        "_current_io",
+        "_ms_active_io",
+        "_ms_active_io_w"
+    ]
+
+    sector_size = 512
+
+    metrics = {
+        "iops": ("iops", "gauge"),
+        "r_iops": ("r_iops", "gauge"),
+        "w_iops": ("w_iops", "gauge"),
+        "bytes_per_sec": ("bytes_per_sec", "gauge"),
+        "r_bytes_per_sec": ("r_bytes_per_sec", "gauge"),
+        "w_bytes_per_sec": ("w_bytes_per_sec", "gauge"),
+        "util": ("util", "gauge"),
+        "await": ("await", "gauge"),
+        "r_await": ("r_await", "gauge"),
+        "w_await": ("w_await", "gauge"),
+    }
+
+    def __init__(self):
+        self._previous = []
+        self._current = []
+
+        # Seed the metrics we're interested in
+        for ctr in IOstat.metrics.keys():
+            setattr(self, ctr, 0)
+
+    def __str__(self):
+        s = '\n- IOstat object:\n'
+        for key in sorted(vars(self)):
+            s += '\t{} ... {}\n'.format(key, getattr(self, key))
+        return s
+
+    def _calc_raw_delta(self):
+        if not self._previous:
+            # nothing to compute yet
+            for ptr in range(len(IOstat.raw_metrics)):
+                key = IOstat.raw_metrics[ptr]
+                setattr(self, key, 0)
+        else:
+            for ptr in range(len(IOstat.raw_metrics)):
+                key = IOstat.raw_metrics[ptr]
+                setattr(self, key, (int(self._current[ptr]) -
+                                    int(self._previous[ptr])))
+
+    def compute(self, sample_interval):
+        """
+        Calculate the iostats for this device
+        """
+
+        self._calc_raw_delta()
+
+        if sample_interval > 0:
+            interval_ms = sample_interval * 1000
+            total_io = self._reads + self._writes
+            self.util = float(self._ms_active_io) / interval_ms * 100
+            self.iops = int(total_io) / sample_interval
+            self.r_iops = int(self._reads) / sample_interval
+            self.w_iops = int(self._writes) / sample_interval
+            self.await = (float(self._write_ms + self._read_ms) / total_io if
+                          total_io > 0 else 0)
+            self.w_await = float(
+                self._write_ms) / self._writes if self._writes > 0 else 0
+            self.r_await = float(
+                self._read_ms) / self._reads if self._reads > 0 else 0
+            self.r_bytes_per_sec = (float(
+                self._sectors_read * IOstat.sector_size)) / sample_interval
+            self.w_bytes_per_sec = (float(
+                self._sectors_written * IOstat.sector_size)) / sample_interval
+            self.bytes_per_sec = self.r_bytes_per_sec + self.w_bytes_per_sec
+
+
+class Disk(object):
+
+    metrics = {
+        "rotational": ("rotational", "gauge"),
+        "disk_size": ("disk_size", "gauge"),
+        "fs_size": ("fs_size", "gauge"),
+        "fs_used": ("fs_used", "gauge"),
+        "fs_percent_used": ("fs_percent_used", "gauge"),
+        "osd_id": ("osd_id", "gauge")
+    }
+
+    osd_types = {"filestore": 0, "bluestore": 1}
+
+    def __init__(self, device_name, path_name=None, osd_id=None,
+                 in_osd_type="filestore", encrypted=0):
+
+        self._name = device_name
+        self._path_name = path_name
+        self._base_dev = Disk.get_base_dev(device_name)
+        self.osd_id = osd_id
+
+        self.rotational = self._get_rota()
+        self.disk_size = self._get_size()
+        self.perf = IOstat()
+        self.fs_size = 0
+        self.fs_percent_used = 0
+        self.fs_used = 0
+        self.encrypted = encrypted
+        self.osd_type = Disk.osd_types[in_osd_type]
+
+        self.refresh()
+
+    def _get_size(self):
+        return int(fread("/sys/block/{}/size".format(self._base_dev))) * 512
+
+    def _get_rota(self):
+        return int(
+            fread("/sys/block/{}/queue/rotational".format(self._base_dev)))
+
+    def _get_fssize(self):
+        s = statvfs("{}/whoami".format(self._path_name))
+        fs_size = s.f_blocks * s.f_bsize
+        fs_used = fs_size - (s.f_bfree * s.f_bsize)
+        fs_percent_used = math.ceil((float(fs_used) / fs_size) * 100)
+        return fs_size, fs_used, fs_percent_used
+
+    def refresh(self):
+        # only run the fs size update, if the _path_name is set.
+        if self._path_name:
+            self.fs_size, self.fs_used, self.fs_percent_used = \
+                self._get_fssize()
+
+    @staticmethod
+    def get_base_dev(dev_name):
+
+        # for intelcas devices, just use the device name as is
+        if dev_name.startswith('intelcas'):
+            device = dev_name
+        elif dev_name.startswith('nvme'):
+            if 'p' in dev_name:
+                device = dev_name[:(dev_name.index('p'))]
+            else:
+                device = dev_name
+        else:
+            # default strip any numeric ie. sdaa1 -> sdaa
+            device = filter(lambda ch: ch.isalpha(), dev_name)
+
+        return device
diff --git a/cephmetrics_collectors/iscsi.py b/cephmetrics_collectors/iscsi.py
new file mode 100644 (file)
index 0000000..c93c60f
--- /dev/null
@@ -0,0 +1,255 @@
+#!/usr/bin/env python2
+
+# requires python-rtslib_fb for LIO interaction
+#
+# NB. the rtslib_fb module is dynamically loaded by the ISCSIGateway
+# class instantiation. This prevents import errors within the generic parent
+# module cephmetrics
+#
+import os
+import sys
+import time
+
+from cephmetrics_collectors import (base, common)
+
+
+class Client(object):
+
+    def __init__(self, iqn):
+        self.iqn = iqn
+        self.name = iqn.replace('.', '-')
+        self.luns = {}
+        self.lun_count = 0
+        self._cycle = 0
+
+    def dump(self):
+        client_dump = {}
+        lun_info = {}
+        client_dump[self.name] = {"luns": {},
+                                  "lun_count": self.lun_count}
+        for lun_name in self.luns:
+            lun = self.luns[lun_name]
+            lun_info.update(lun.dump())
+
+        return {self.name: {"luns": lun_info,
+                            "lun_count": len(lun_info)}
+                }
+
+
+class LUN(object):
+
+    def __init__(self, client, tpg_lun):
+        self._path = tpg_lun.storage_object.path
+        self._tpg_lun = tpg_lun
+        self._name = tpg_lun.storage_object.name
+        self._display_name = tpg_lun.storage_object.name.replace('.', "-")
+        self._so = tpg_lun.storage_object
+        self._client = client
+        self._cycle = 0
+        self.size = 0
+        self.iops = 0
+        self.read_bytes_per_sec = 0
+        self.write_bytes_per_sec = 0
+        self.total_bytes_per_sec = 0
+        self.active_path = 0
+
+    def refresh(self, cycle_id):
+        self._cycle = cycle_id
+        self.size = self._so.size
+        stats_path = os.path.join(self._path, 'statistics/scsi_lu')
+        self.iops = int(common.fread(os.path.join(stats_path, "num_cmds")))
+        read_mb = float(common.fread(os.path.join(stats_path, "read_mbytes")))
+        write_mb = float(
+            common.fread(os.path.join(stats_path, "write_mbytes")))
+        self.read_bytes_per_sec = int(read_mb * 1024 ** 2)
+        self.write_bytes_per_sec = int(write_mb * 1024 ** 2)
+        self.total_bytes_per_sec = self.read_bytes_per_sec + \
+            self.write_bytes_per_sec
+
+        if self._tpg_lun.alua_tg_pt_gp_name == 'ao':
+            self.active_path = 1
+        else:
+            self.active_path = 0
+
+    def dump(self):
+        return {self._display_name: {k: getattr(self, k) for k in self.__dict__
+                                     if not k.startswith("_")}}
+
+
+class ISCSIGateway(base.BaseCollector):
+    """
+    created on a host that has a /sys/kernel/config/target/iscsi dir
+    i.e. there is an iscsi gateway here!
+    """
+
+    metrics = {
+        "lun_count": ("lun_count", "gauge"),
+        "client_count": ("client_count", "gauge"),
+        "tpg_count": ("tpg_count", "gauge"),
+        "sessions": ("sessions", "gauge"),
+        "capacity": ("capacity", "gauge"),
+        "iops": ("iops", "derive"),
+        "read_bytes_per_sec": ("read_bytes_per_sec", "derive"),
+        "write_bytes_per_sec": ("write_bytes_per_sec", "derive"),
+        "total_bytes_per_sec": ("total_bytes_per_sec", "derive")
+    }
+
+    def __init__(self, *args, **kwargs):
+        base.BaseCollector.__init__(self, *args, **kwargs)
+
+        # Since the module can be imported by a parent class but not
+        # instantiated, the rtslib import is deferred until the first instance
+        # of the the class is created. This keeps the parent module simple
+        # and more importantly generic
+        if 'rtslib_fb.root' not in sys.modules.keys():
+
+            try:
+                import rtslib_fb.root as RTSRoot
+            except ImportError:
+                raise ImportError("rtslib_fb python package is missing")
+
+        self._root = RTSRoot()
+
+        self.clients = {}
+        self.cycle = 0
+
+        self.iops = 0
+        self.read_bytes_per_sec = 0
+        self.write_bytes_per_sec = 0
+        self.total_bytes_per_sec = 0
+
+    def refresh(self):
+        """
+        populate the instance by exploring rtslib
+        """
+
+        self.iops = 0
+        self.read_bytes_per_sec = 0
+        self.write_bytes_per_sec = 0
+        self.total_bytes_per_sec = 0
+
+        if self.cycle == 10:
+            self.cycle = 0
+        else:
+            self.cycle += 1
+
+        for node_acl in self._root.node_acls:
+
+            client_name = node_acl.node_wwn
+
+            if client_name not in self.clients:
+                new_client = Client(client_name)
+                self.clients[client_name] = new_client
+
+            client = self.clients[client_name]
+            client.lun_count = 0
+            client._cycle = self.cycle
+
+            for lun in node_acl.mapped_luns:
+                client.lun_count += 1
+                tpg_lun = lun.tpg_lun
+                lun_name = tpg_lun.storage_object.name
+                if lun_name not in client.luns:
+                    lun = LUN(client, tpg_lun)
+                    client.luns[lun._name] = lun
+                else:
+                    lun = client.luns[lun_name]
+
+                lun.refresh(self.cycle)
+
+                self.iops += lun.iops
+                self.read_bytes_per_sec += lun.read_bytes_per_sec
+                self.write_bytes_per_sec += lun.write_bytes_per_sec
+                self.total_bytes_per_sec = self.read_bytes_per_sec + \
+                    self.write_bytes_per_sec
+
+    def prune(self):
+        """
+        drop child objects held by the instance, that are no longer in the
+        iSCSI config i.e. don't report on old information
+        """
+
+        for client_name in self.clients:
+            client = self.clients[client_name]
+
+            for lun_name in client.luns:
+                lun = client.luns[lun_name]
+                if lun._cycle != self.cycle:
+                    # drop the lun entry
+                    self.logger.debug("pruning LUN '{}'".format(lun_name))
+
+                    del client.luns[lun_name]
+
+            if client._cycle != self.cycle:
+                # drop the client entry
+                self.logger.debug("pruning client '{}'".format(client_name))
+                del self.clients[client_name]
+
+    def dump(self):
+
+        gw_stats = {}
+        client_stats = {}
+
+        for metric in ISCSIGateway.metrics:
+            gw_stats[metric] = getattr(self, metric)
+
+        for client_name in self.clients:
+            client = self.clients[client_name]
+            client_stats.update(client.dump())
+
+        return {"iscsi": {
+            "gw_name": {self.gateway_name: 0},
+            "gw_stats": gw_stats,
+            "gw_clients": client_stats
+            }
+        }
+
+    def _get_so(self):
+        return [so for so in self._root.storage_objects]
+
+    def _get_node_acls(self):
+        return [node for node in self._root.node_acls]
+
+    @property
+    def tpg_count(self):
+        return len([tpg for tpg in self._root.tpgs])
+
+    @property
+    def lun_count(self):
+        return len(self._get_so())
+
+    @property
+    def sessions(self):
+        return len([session for session in self._root.sessions])
+
+    @property
+    def gateway_name(self):
+        # Only the 1st gateway is considered/supported
+        gw_iqn = [gw.wwn for gw in self._root.targets][0]
+        return gw_iqn.replace('.', '-')
+
+    @property
+    def client_count(self):
+        return len(self._get_node_acls())
+
+    @property
+    def capacity(self):
+        return sum([so.size for so in self._get_so()])
+
+    def get_stats(self):
+
+        start = time.time()
+
+        # populate gateway instance with the latest configuration from rtslib
+        self.refresh()
+
+        # Overtime they'll be churn in client and disks so we need to drop
+        # any entries from prior runs that are no longer seen in the iscsi
+        # configuration with the prune method
+        self.prune()
+
+        end = time.time()
+
+        self.logger.info("LIO stats took {}s".format(end - start))
+
+        return self.dump()
diff --git a/cephmetrics_collectors/mon.py b/cephmetrics_collectors/mon.py
new file mode 100644 (file)
index 0000000..3d75238
--- /dev/null
@@ -0,0 +1,431 @@
+#!/usr/bin/env python
+
+import rados
+import rbd
+import json
+import threading
+import time
+import logging
+
+from cephmetrics_collectors import (base, common)
+
+
+class RBDScanner(threading.Thread):
+
+    def __init__(self, cluster_name, pool_name):
+        self.cluster_name = cluster_name
+        self.pool_name = pool_name
+        self.num_rbds = 0
+        self.logger = logging.getLogger('cephmetrics')
+
+        threading.Thread.__init__(self)
+
+    def run(self):
+        rbd_images = []
+        conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
+        self.logger.debug("scan of '{}' starting".format(self.pool_name))
+        with rados.Rados(conffile=conf_file) as cluster:
+            with cluster.open_ioctx(self.pool_name) as ioctx:
+                rbd_inst = rbd.RBD()
+                self.logger.debug("listing rbd's in {}".format(self.pool_name))
+                rbd_images = rbd_inst.list(ioctx)
+
+        self.logger.info("pool scan complete for '{}'".format(self.pool_name))
+        self.num_rbds = len(rbd_images)
+
+
+class Mon(base.BaseCollector):
+
+    health = {
+        "HEALTH_OK": 0,
+        "HEALTH_WARN": 4,
+        "HEALTH_ERR": 8
+    }
+
+    osd_state = {
+        "up": 0,
+        "down": 1
+    }
+
+    # metrics are declared, where each element has a description and collectd
+    # data type. The description is used to ensure the names sent by collectd
+    # remain the same even if the source name changes in ceph.
+    cluster_metrics = {
+        "num_mon": ("num_mon", "gauge"),
+        "num_mon_quorum": ("num_mon_quorum", "gauge"),
+        "num_rbds": ("num_rbds", "gauge"),
+        "num_osd_hosts": ("num_osd_hosts", "gauge"),
+        "num_osd": ("num_osd", "gauge"),
+        "num_osd_up": ("num_osd_up", "gauge"),
+        "num_osd_in": ("num_osd_in", "gauge"),
+        "osd_epoch": ("osd_epoch", "gauge"),
+        "osd_bytes": ("osd_bytes", "gauge"),
+        "osd_bytes_used": ("osd_bytes_used", "gauge"),
+        "osd_bytes_avail": ("osd_bytes_avail", "gauge"),
+        "num_pool": ("num_pool", "gauge"),
+        "num_pg": ("num_pg", "gauge"),
+        "num_pg_active_clean": ("num_pg_active_clean", "gauge"),
+        "num_pg_active": ("num_pg_active", "gauge"),
+        "num_pg_peering": ("num_pg_peering", "gauge"),
+        "num_object": ("num_object", "gauge"),
+        "num_object_degraded": ("num_object_degraded", "gauge"),
+        "num_object_misplaced": ("num_object_misplaced", "gauge"),
+        "num_object_unfound": ("num_object_unfound", "gauge"),
+        "num_bytes": ("num_bytes", "gauge"),
+        "num_mds_up": ("num_mds_up", "gauge"),
+        "num_mds_in": ("num_mds_in", "gauge"),
+        "num_mds_failed": ("num_mds_failed", "gauge"),
+        "mds_epoch": ("mds_epoch", "gauge"),
+        "health": ("health", "gauge")
+    }
+
+    pool_client_metrics = {
+        'bytes_sec': ("bytes_sec", "gauge"),
+        'op_per_sec': ("op_per_sec", "gauge"),
+        'read_bytes_sec': ("read_bytes_sec", "gauge"),
+        'write_op_per_sec': ("write_op_per_sec", "gauge"),
+        'write_bytes_sec': ("write_bytes_sec", "gauge"),
+        'read_op_per_sec': ("read_op_per_sec", "gauge")
+    }
+
+    pool_recovery_metrics = {
+        "recovering_objects_per_sec": ("recovering_objects_per_sec", "gauge"),
+        "recovering_bytes_per_sec": ("recovering_bytes_per_sec", "gauge"),
+        "recovering_keys_per_sec": ("recovering_keys_per_sec", "gauge"),
+        "num_objects_recovered": ("num_objects_recovered", "gauge"),
+        "num_bytes_recovered": ("num_bytes_recovered", "gauge"),
+        "num_keys_recovered": ("num_keys_recovered", "gauge")
+    }
+
+    osd_metrics = {
+        "status": ("status", "gauge")
+    }
+
+    mon_states = {
+        "mon_status": ("mon_status", "gauge")
+    }
+
+    all_metrics = common.merge_dicts(
+        pool_recovery_metrics, pool_client_metrics)
+    all_metrics = common.merge_dicts(all_metrics, cluster_metrics)
+    all_metrics = common.merge_dicts(all_metrics, osd_metrics)
+    all_metrics = common.merge_dicts(all_metrics, mon_states)
+
+    def __init__(self, *args, **kwargs):
+        base.BaseCollector.__init__(self, *args, **kwargs)
+        self.version = self._get_version()
+        if self.version < 12:
+            self.get_mon_health = self._mon_health
+        else:
+            self.get_mon_health = self._mon_health_new
+
+    def _get_version(self):
+        vers_info = self._mon_command('version')
+        return int(vers_info['version'].replace('.', ' ').split()[2])
+
+    def _mon_command(self, cmd_request):
+        """ Issue a command to the monitor """
+
+        buf_s = '{}'
+        conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
+
+        start = time.time()
+        with rados.Rados(conffile=conf_file) as cluster:
+            cmd = {'prefix': cmd_request, 'format': 'json'}
+            rc, buf_s, out = cluster.mon_command(json.dumps(cmd), b'')
+        end = time.time()
+
+        self.logger.debug("_mon_command call '{}' :"
+                          " {:.3f}s".format(cmd_request,
+                                        (end - start)))
+
+        return json.loads(buf_s)
+
+    @staticmethod
+    def get_feature_state(summary_data, pg_states):
+        """
+        Look at the summary list to determine the state of RADOS features
+        :param summary_data: (list) summary data from a ceph health command
+        :return: (dict) dict indexed by feature
+                        0 Inactive, 1 Active, 2 Disabled
+        """
+        feature_lookup = {"noscrub": "scrub",
+                          "nodeep-scrub": "deep_scrub",
+                          "norecover": "recovery",
+                          "nobackfill": "backfill",
+                          "norebalance": "rebalance",
+                          "noout": "out",
+                          "nodown": "down"}
+
+        # Start with all features inactive i.e. enabled
+        feature_state = {feature_lookup.get(key): 0 for key in feature_lookup}
+
+        for summary in summary_data:
+            summary_desc = summary.get('summary')
+            if "flag(s) set" in summary_desc:
+                flags = summary_desc.replace(' flag(s) set', '').split(',')
+                for disabled_feature in flags:
+                    if disabled_feature in feature_lookup:
+                        feature = feature_lookup.get(disabled_feature)
+                        feature_state[feature] = 2      # feature disabled
+
+        # Now use the current pg state names to determine whether a feature is
+        # active - if not it stays set to '0', which means inactive
+        pg_state_names = [pg_state.get('name') for pg_state in pg_states]
+        for pg_state in pg_state_names:
+            states = pg_state.split('+')
+            if 'recovering' in states:
+                feature_state['recovery'] = 1  # Active
+                continue
+            if 'backfilling' in states:
+                feature_state['backfill'] = 1
+                continue
+            if 'deep' in states:
+                feature_state['deep_scrub'] = 1
+                continue
+            if 'scrubbing' in states:
+                feature_state['scrub'] = 1
+
+        return feature_state
+
+    @classmethod
+    def check_stuck_pgs(cls, summary_list):
+        bad_pg_words = ['pgs', 'stuck', 'inactive']
+        stuck_pgs = 0
+        for summary_data in summary_list:
+            if summary_data.get('severity') != 'HEALTH_ERR':
+                continue
+            if all(trigger in summary_data.get('summary')
+                   for trigger in bad_pg_words):
+                stuck_pgs = int(summary_data.get('summary').split()[0])
+
+        return stuck_pgs
+
+    def _mon_health_new(self):
+
+        cluster, health_data = self._mon_health_common()
+
+        mon_status_output = self._mon_command('mon_status')
+        quorum_list = mon_status_output.get('quorum')
+        mon_list = mon_status_output.get('monmap').get('mons')
+        mon_status = {}
+        for mon in mon_list:
+            state = 0 if mon.get('rank') in quorum_list else 4
+            mon_status[mon.get('name')] = state
+
+        cluster['mon_status'] = mon_status
+
+        return cluster
+
+    def _mon_health_common(self):
+
+        # for v12 (Luminous and beyond) add the following setting to
+        # ceph.conf "mon_health_preluminous_compat=true"
+        # this will provide the same output as pre-luminous
+
+        cluster_data = self._admin_socket().get('cluster')
+        pg_data = self._mon_command("pg stat")
+        health_data = self._mon_command("health")
+        health_text = health_data.get('overall_status',
+                                      health_data.get('status', ''))
+
+        cluster = {Mon.cluster_metrics[k][0]: cluster_data[k]
+                   for k in cluster_data}
+
+        health_num = Mon.health.get(health_text, 16)
+
+        cluster['health'] = health_num
+
+        pg_states = pg_data.get('num_pg_by_state')  # list of dict name,num
+        health_summary = health_data.get('summary', [])  # list of issues
+        cluster['num_pgs_stuck'] = Mon.check_stuck_pgs(health_summary)
+        cluster['features'] = Mon.get_feature_state(health_summary,
+                                                    pg_states)
+
+        self.logger.debug(
+            'Features:{}'.format(json.dumps(cluster['features'])))
+
+        return cluster, health_data
+
+    def _mon_health(self):
+
+        cluster, health_data = self._mon_health_common()
+
+        services = health_data.get('health').get('health_services')
+        monstats = {}
+        for svc in services:
+            if 'mons' in svc:
+                # Each monitor will have a numeric value denoting health
+                monstats = { mon.get('name'): Mon.health.get(mon.get('health'))
+                             for mon in svc.get('mons')}
+
+        cluster['mon_status'] = monstats
+
+        return cluster
+
+    @classmethod
+    def _seed(cls, metrics):
+        return {metrics[key][0]: 0 for key in metrics}
+
+    def display_names(self, metric_format, metrics):
+        """
+        convert the keys to the static descriptions
+        :return:
+        """
+        return {metric_format[k][0]: metrics[k]
+                for k in metrics} if metrics else {}
+
+    def _get_pool_stats(self):
+        """ get pool stats from rados """
+
+        raw_stats = self._mon_command('osd pool stats')
+        pool_stats = {}
+
+        # process each pool
+        for pool in raw_stats:
+
+            pool_name = pool['pool_name'].replace('.', '_')
+            client_io = self.display_names(Mon.pool_client_metrics,
+                                           pool.get('client_io_rate'))
+            recovery = self.display_names(Mon.pool_recovery_metrics,
+                                          pool.get('recovery_rate'))
+
+            pool_md = {}
+            if client_io:
+
+                # Add pool level aggregation
+                client_io['bytes_sec'] = client_io.get('read_bytes_sec', 0) + \
+                    client_io.get('write_bytes_sec', 0)
+                client_io["op_per_sec"] = client_io.get('read_op_per_sec', 0)+ \
+                    client_io.get('write_op_per_sec', 0)
+                pool_md = client_io
+
+            else:
+                pool_md = Mon._seed(Mon.pool_client_metrics)
+
+            if recovery:
+                pool_md = common.merge_dicts(pool_md, recovery)
+            else:
+                pool_md = common.merge_dicts(pool_md, Mon._seed(
+                    Mon.pool_recovery_metrics))
+
+            pool_stats[pool_name] = pool_md
+
+        return pool_stats
+
+    def _get_osd_states(self):
+
+        self.logger.debug("fetching osd states from the local mon")
+        raw = self._mon_command('osd dump')
+        osd_hosts = set()
+        osds = {}
+        for osd in raw.get('osds'):
+            cluster_addr = osd.get('cluster_addr').split(':')[0]
+            osd_hosts.add(cluster_addr)
+
+            # NB. The key for the osds dict must be a string as the dict is
+            # flattened when the metric name is derived in the parent collectd
+            # module. If it is not converted, you get a TypeError
+            osds[str(osd.get('osd'))] = {"up": osd.get('up'),
+                                         "in": osd.get('in')}
+
+        return len(osd_hosts), osds
+
+    @staticmethod
+    def _select_pools(pools, mons):
+        """
+        determine the pools this mon should scan based on it's name. We select
+        pools from the an offset into the pool list, and then repeat at an
+        interval set by # mons in the configuration. This splits up the pools
+        we have, so each mon looks at a discrete set of pools instead of all
+        mons performing all scans.
+        :param pools: (list) rados pool names
+        :param mons: (list) monitor names from ceph health
+        :return: (list) of pools this monitor should scan. empty list if the
+                 monitor name mismatches - so no scans done
+        """
+
+        pools_to_scan = []
+
+        try:
+            freq = mons.index(common.get_hostname())
+        except ValueError:
+            # this host's name is not in the monitor list?
+            # twilight zone moment
+            pass
+        else:
+
+            pools_to_scan = [pools[ptr]
+                             for ptr in xrange(freq, len(pools), len(mons))]
+
+        return pools_to_scan
+
+    def get_pools(self):
+        skip_pools = ('default.rgw')
+
+        start = time.time()
+        conf_file = "/etc/ceph/{}.conf".format(self.cluster_name)
+        with rados.Rados(conffile=conf_file) as cluster:
+            rados_pools = sorted(cluster.list_pools())
+        end = time.time()
+
+        self.logger.debug('lspools took {:.3f}s'.format(end - start))
+
+        filtered_pools = [pool for pool in rados_pools
+                          if not pool.startswith(skip_pools)]
+
+        return filtered_pools
+
+    def _get_rbds(self, monitors):
+
+        pool_list = self.get_pools()
+        mon_list = sorted(monitors.keys())
+        my_pools = Mon._select_pools(pool_list, mon_list)
+        self.logger.debug("Pools to be scanned on this mon"
+                          " : {}".format(','.join(my_pools)))
+        threads = []
+
+        start = time.time()
+
+        for pool in my_pools:
+            thread = RBDScanner(self.cluster_name, pool)
+            thread.start()
+            threads.append(thread)
+
+        # wait for all threads to complete
+        for thread in threads:
+            thread.join(1)
+
+        end = time.time()
+        self.logger.debug("rbd scans {:.3f}s".format((end - start)))
+
+        total_rbds = sum([thread.num_rbds for thread in threads])
+        self.logger.debug("total rbds found : {}".format(total_rbds))
+
+        for thread in threads:
+            del thread
+
+        return total_rbds
+
+    def get_stats(self):
+        """
+        method associated with the plugin callback to gather the metrics
+        :return: (dict) metadata describing the state of the mon/osd's
+        """
+
+        start = time.time()
+
+        pool_stats = self._get_pool_stats()
+        num_osd_hosts, osd_states = self._get_osd_states()
+        cluster_state = self.get_mon_health()
+        cluster_state['num_osd_hosts'] = num_osd_hosts
+        cluster_state['num_rbds'] = self._get_rbds(cluster_state['mon_status'])
+
+        all_stats = common.merge_dicts(cluster_state, {"pools": pool_stats,
+                                                "osd_state": osd_states})
+
+        end = time.time()
+        self.logger.info("mon get_stats call : {:.3f}s".format((end - start)))
+
+        return {"mon": all_stats}
+
diff --git a/cephmetrics_collectors/osd.py b/cephmetrics_collectors/osd.py
new file mode 100644 (file)
index 0000000..9dfb364
--- /dev/null
@@ -0,0 +1,332 @@
+#!/usr/bin/env python
+
+import os
+import time
+import math
+
+from cephmetrics_collectors import (base, common)
+
+__author__ = "Paul Cuzner"
+
+
+class OSDstats(object):
+
+    osd_capacity = {
+        "stat_bytes": ("stat_bytes", "gauge"),
+        "stat_bytes_used": ("stat_bytes_used", "gauge"),
+        "stat_bytes_avail": ("stat_bytes_avail", "gauge")
+    }
+
+    filestore_metrics = {
+        "journal_latency",
+        "commitcycle_latency",
+        "apply_latency",
+        "queue_transaction_latency_avg"
+    }
+
+    def __init__(self, osd_type='filestore'):
+        self._current = {}
+        self._previous = {}
+        self._osd_type = osd_type
+        self.osd_percent_used = 0
+
+    def update(self, stats):
+        """
+        update the objects attributes based on the dict
+        :param stats: (dict) containing filestore performance ('filestore')
+               and capacity info ('osd')
+        :return: None
+        """
+
+        if self._current:
+            self._previous = self._current
+            self._current = stats['filestore']
+        else:
+            self._current = stats['filestore']
+
+        for attr in OSDstats.filestore_metrics:
+
+            if self._previous:
+                d_sum = self._current[attr].get('sum') - \
+                    self._previous[attr].get('sum')
+                d_avgcount = self._current[attr].get('avgcount') - \
+                    self._previous[attr].get('avgcount')
+
+                if d_sum == 0 or d_avgcount == 0:
+                    val = 0
+                else:
+                    val = float(d_sum) / d_avgcount
+            else:
+                # no previous value, so set to 0
+                val = 0
+
+            setattr(self, attr, val)
+
+        for attr in stats['osd']:
+            setattr(self, attr, stats['osd'].get(attr))
+
+        self.osd_percent_used = math.ceil((float(self.stat_bytes_used) /
+                                           self.stat_bytes) * 100)
+
+
+class OSDs(base.BaseCollector):
+
+    all_metrics = common.merge_dicts(
+        common.Disk.metrics, common.IOstat.metrics)
+
+    def __init__(self, cluster_name, **kwargs):
+        base.BaseCollector.__init__(self, cluster_name, **kwargs)
+        self.timestamp = int(time.time())
+
+        self.osd = {}          # dict of disk objects, each disk contains osd_id
+        self.jrnl = {}      # dict of journal devices (if not collocated)
+        self.osd_id_list = []
+        self.dev_lookup = {}    # dict dev_name -> osd | jrnl
+        self.osd_count = 0
+
+    def __repr__(self):
+
+        s = ''
+        for disk in self.osd:
+            s += "{}\n".format(disk)
+            dev = self.osd[disk]
+
+            for var in vars(dev):
+                if not var.startswith('_'):
+                    s += "{} ... {}\n".format(var, getattr(dev, var))
+        return s
+
+    def _fetch_osd_stats(self, osd_id):
+
+        # NB: osd stats are cumulative
+
+        stats = {}
+        osd_socket_name = '/var/run/ceph/{}-osd.{}.asok'.format(
+            self.cluster_name, osd_id)
+
+        if not os.path.exists(osd_socket_name):
+            # all OSD's should expose an admin socket, so if it's missing
+            # this node has a problem!
+            raise IOError("Socket file missing for OSD {}".format(osd_id))
+
+        self.logger.debug("fetching osd stats for osd {}".format(osd_id))
+        resp = self._admin_socket(socket_path=osd_socket_name)
+
+        filestore_stats = resp.get('filestore')
+        stats['filestore'] = {key_name: filestore_stats.get(key_name)
+                              for key_name in OSDstats.filestore_metrics}
+
+        osd_stats = resp.get('osd')
+
+        # Add disk usage stats
+        stats['osd'] = {key_name: osd_stats.get(key_name)
+                        for key_name in OSDstats.osd_capacity.keys()}
+
+        return stats
+
+    @staticmethod
+    def get_osd_type(osd_path):
+
+        osd_type_fname = os.path.join(osd_path, 'type')
+        if os.path.exists(osd_type_fname):
+            return common.fread(osd_type_fname)
+        else:
+            if os.path.exists(os.path.join(osd_path, 'journal')):
+                return "filestore"
+            else:
+                raise ValueError("Unrecognised OSD type")
+
+    def _dev_to_osd(self):
+        """
+        Look at the system to determine which disks are acting as OSD's
+        """
+
+        # the logic here uses the mount points to determine which OSD's are
+        # in the system. The encryption state is determine just by the use
+        # devicemapper (i.e. /dev/mapper prefixed devices) - since at this time
+        # this is all dm is used for.
+
+        osd_indicators = {'var', 'lib', 'osd'}
+
+        for mnt in common.freadlines('/proc/mounts'):
+            items = mnt.split(' ')
+            dev_path, path_name = items[:2]
+            if path_name.startswith('/var/lib'):
+                # take a close look since this is where ceph osds usually
+                # get mounted
+                dirs = set(path_name.split('/'))
+                if dirs.issuperset(osd_indicators):
+
+                    # get the osd_id from the name is the most simple way
+                    # to get the id, due to naming conventions. If this fails
+                    # though, plan 'b' is the whoami file
+                    osd_id = path_name.split('-')[-1]
+                    if not osd_id.isdigit():
+                        osd_id = common.fread(
+                            os.path.join(path_name, 'whoami'))
+
+                    if osd_id not in self.osd:
+                        osd_type = OSDs.get_osd_type(path_name)
+                        self.osd[osd_id] = OSDstats(osd_type=osd_type)
+                        self.osd_id_list.append(osd_id)
+
+                        osd_type = self.osd[osd_id]._osd_type
+                        if osd_type == 'filestore':
+                            if dev_path.startswith('/dev/mapper'):
+                                encrypted = 1
+                                uuid = dev_path.split('/')[-1]
+                                partuuid = '/dev/disk/by-partuuid/{}'.format(
+                                    uuid)
+                                dev_path = os.path.realpath(partuuid)
+                                osd_device = dev_path.split('/')[-1]
+                            else:
+                                encrypted = 0
+                                osd_device = dev_path.split('/')[-1]
+
+                        elif osd_type == 'bluestore':
+                            block_link = os.path.join(path_name, 'block')
+                            osd_path = os.path.realpath(block_link)
+                            osd_device = osd_path.split('/')[-1]
+                            encrypted = 0
+                        else:
+                            raise ValueError("Unknown OSD type encountered")
+
+                        # if the osd_id hasn't been seem neither has the
+                        # disk
+                        self.osd[osd_device] = common.Disk(
+                            osd_device,
+                            path_name=path_name,
+                            osd_id=osd_id,
+                            in_osd_type=osd_type,
+                            encrypted=encrypted)
+                        self.dev_lookup[osd_device] = 'osd'
+                        self.osd_count += 1
+
+                        if osd_type == 'filestore':
+                            journal_link = os.path.join(path_name, 'journal')
+                        else:
+                            journal_link = os.path.join(path_name, 'block.wal')
+
+                        if os.path.exists(journal_link):
+                            link_tgt = os.readlink(journal_link)
+                            if link_tgt.startswith('/dev/mapper'):
+                                encrypted = 1
+                            else:
+                                encrypted = 0
+
+                            partuuid_path = os.path.join(
+                                '/dev/disk/by-partuuid',
+                                link_tgt.split('/')[-1])
+                            jrnl_path = os.path.realpath(partuuid_path)
+                            jrnl_dev = jrnl_path.split('/')[-1]
+
+                            if jrnl_dev not in self.osd:
+                                self.jrnl[jrnl_dev] = common.Disk(
+                                    jrnl_dev,
+                                    osd_id=osd_id,
+                                    in_osd_type=osd_type,
+                                    encrypted=encrypted)
+
+                                self.dev_lookup[jrnl_dev] = 'jrnl'
+
+                        else:
+                            # No journal or WAL link..?
+                            pass
+
+    def _stats_lookup(self):
+        """
+        Grab the disk stats from /proc/diskstats, and the key osd perf dump
+        counters
+        """
+
+        now = time.time()
+        interval = int(now) - self.timestamp
+        self.timestamp = int(now)
+
+        # Fetch diskstats from the OS
+        for perf_entry in common.freadlines('/proc/diskstats'):
+
+            field = perf_entry.split()
+            dev_name = field[2]
+
+            device = None
+            if self.dev_lookup.get(dev_name, None) == 'osd':
+                device = self.osd[dev_name]
+            elif self.dev_lookup.get(dev_name, None) == 'jrnl':
+                device = self.jrnl[dev_name]
+
+            if device:
+                new_stats = field[3:]
+
+                if device.perf._current:
+                    device.perf._previous = device.perf._current
+                    device.perf._current = new_stats
+                else:
+                    device.perf._current = new_stats
+
+                device.perf.compute(interval)
+                device.refresh()
+
+        end = time.time()
+        self.logger.debug("OS disk stats calculated in "
+                          "{:.4f}s".format(end-now))
+
+        # fetch stats from each osd daemon
+        osd_stats_start = time.time()
+        for osd_id in self.osd_id_list:
+
+            if self.osd[osd_id]._osd_type == 'filestore':
+                osd_stats = self._fetch_osd_stats(osd_id)
+
+                # self.logger.debug('stats : {}'.format(osd_stats))
+
+                osd_device = self.osd[osd_id]
+                osd_device.update(osd_stats)
+            else:
+                self.logger.debug("skipped 'bluestore' osd perf collection "
+                                  "for osd.{}".format(osd_id))
+
+        osd_stats_end = time.time()
+        self.logger.debug(
+            "OSD perf dump stats collected for {} OSDs in {:.3f}s".format(
+                len(self.osd_id_list), (osd_stats_end - osd_stats_start)))
+
+    @staticmethod
+    def _dump_devs(device_dict):
+
+        dumped = {}
+
+        for dev_name in sorted(device_dict):
+            device = device_dict[dev_name]
+            dumped[dev_name] = common.todict(device)
+
+        return dumped
+
+    def dump(self):
+        """
+        dump the osd object(s) to a dict. The object *must* not have references
+        to other objects - if this rule is broken cephmetrics caller will fail
+        when parsing the dict
+
+        :return: (dict) dictionary representation of this OSDs on this host
+        """
+
+        return {
+            "num_osds": self.osd_count,
+            "osd": OSDs._dump_devs(self.osd),
+            "jrnl": OSDs._dump_devs(self.jrnl)
+        }
+
+    def get_stats(self):
+
+        start = time.time()
+
+        self._dev_to_osd()
+        self._stats_lookup()
+
+        end = time.time()
+
+        self.logger.info("osd get_stats call "
+                         ": {:.3f}s".format((end - start)))
+
+        return self.dump()
diff --git a/cephmetrics_collectors/rgw.py b/cephmetrics_collectors/rgw.py
new file mode 100644 (file)
index 0000000..772d598
--- /dev/null
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+
+import time
+
+from cephmetrics_collectors import (base, common)
+
+__author__ = "paul.cuzner@redhat.com"
+
+
+class RGW(base.BaseCollector):
+
+    simple_metrics = {
+        "req": ("requests", "derive"),
+        "failed_req": ("requests_failed", "derive"),
+        "get": ("gets", "derive"),
+        "get_b": ("get_bytes", "derive"),
+        "put": ("puts", "derive"),
+        "put_b": ("put_bytes", "derive"),
+        "qlen": ("qlen", "derive"),
+        "qactive": ("requests_active", "derive")
+    }
+
+    int_latencies = [
+        "get_initial_lat",
+        "put_initial_lat"
+    ]
+
+    latencies = {
+        "get_initial_lat_sum": ("get_initial_lat_sum", "derive"),
+        "get_initial_lat_avgcount": ("get_initial_lat_avgcount", "derive"),
+        "put_initial_lat_sum": ("put_initial_lat_sum", "derive"),
+        "put_initial_lat_avgcount": ("put_initial_lat_avgcount", "derive")
+    }
+
+    all_metrics = common.merge_dicts(simple_metrics, latencies)
+
+    def __init__(self, cluster_name, admin_socket, **kwargs):
+        base.BaseCollector.__init__(self, cluster_name, admin_socket, **kwargs)
+        self.host_name = common.get_hostname()
+
+    def _get_rgw_data(self):
+
+        response = self._admin_socket()
+
+        key_name = 'client.rgw.{}'.format(self.host_name)
+
+        return response.get(key_name)
+
+    def _filter(self, stats):
+        # pick out the simple metrics
+
+        filtered = {key: stats[key] for key in RGW.simple_metrics}
+
+        for key in RGW.int_latencies:
+            for _attr in stats[key]:
+                new_key = "{}_{}".format(key, _attr)
+                filtered[new_key] = stats[key].get(_attr)
+
+        return filtered
+
+    def get_stats(self):
+
+        start = time.time()
+
+        raw_stats = self._get_rgw_data()
+
+        stats = self._filter(raw_stats)
+
+        end = time.time()
+
+        self.logger.info("RGW get_stats : {:.3f}s".format((end - start)))
+
+        return {"rgw": stats}