--- /dev/null
+=============
+Influx Plugin
+=============
+
+The influx plugin continuously collects and sends time series data to an influxdb database. Users have the option to specify what type of stats they want to collect.
+Some default counters are already set. However, users will have the option to choose some additional counters to collect.
+
+-------------
+Configuration
+-------------
+
+In order for this module to work, the following configuration should be created ``/etc/ceph/influx.conf``.
+
+^^^^^^^^
+Required
+^^^^^^^^
+
+The configurations must include the following under the header ``[influx]``.
+
+:Configuration: **Description**
+:interval: Sets how often the module will collect the stats and send it to influx
+:hostname: Influx host
+:username: Influx username
+:password: Influx password
+:database: Influx database (if a database does not already exist in influx, the module will create one)
+:port: Influx port
+:stats: Stats about the osd, pool, and cluster can be collected. Specify as many as you would like, but seperate each type by a comma.
+
+
+^^^^^^^^
+Optional
+^^^^^^^^
+
+Users have the ability to collect additional counters for each osd or each cluster under the the header ``[extended]``.
+More information on the extended option can be found below under the *extended* section. Seperate each additional configurations with a comma.
+
+Example config file:
+
+::
+
+ [influx]
+ interval = 10
+ hostname = samplehost
+ username = admin
+ password = pass
+ database = default
+ port = 8086
+ stats = osd, pool, cluster
+
+ [extended]
+ osd = op_latency, recovery_ops
+ cluster = op_latency
+
+--------
+Enabling
+--------
+
+To enable the module, the following should be performed:
+
+- Load module by including this in the ceph.conf file.::
+
+ [mgr]
+ mgr_modules = influx
+
+- Initialize the module to run every set interval ``ceph mgr module enable influx``.
+
+---------
+Disabling
+---------
+
+``ceph mgr module disable influx``
+
+---------
+Debugging
+---------
+
+By default, a few debugging statments as well as error statements have been set to print in the log files. Users can add more if necessary.
+To make use of the debugging option in the module:
+
+- Add this to the ceph.conf file.::
+
+ [mgr]
+ debug_mgr = 20
+
+- Use this command ``ceph tell mgr.<mymonitor> influx self-test``.
+- Check the log files. Users may find it easier to filter the log files using *mgr[influx]*.
+
+-----
+Usage
+-----
+
+^^^^^^^^^^^^^^^^
+Default Counters
+^^^^^^^^^^^^^^^^
+
+**pool**
+
++---------------+-----------------------------------------------------+
+|Counter | Description |
++===============+=====================================================+
+|bytes_used | Bytes used in the pool not including copies |
++---------------+-----------------------------------------------------+
+|max_avail | Max available number of bytes in the pool |
++---------------+-----------------------------------------------------+
+|objects | Number of objects in the pool |
++---------------+-----------------------------------------------------+
+|wr_bytes | Number of bytes written in the pool |
++---------------+-----------------------------------------------------+
+|dirty | Number of bytes dirty in the pool |
++---------------+-----------------------------------------------------+
+|rd_bytes | Number of bytes read in the pool |
++---------------+-----------------------------------------------------+
+|raw_bytes_used | Bytes used in pool including copies made |
++---------------+-----------------------------------------------------+
+
+**osd**
+
++------------+------------------------------------+
+|Counter | Description |
++============+====================================+
+|op_w | Client write operations |
++------------+------------------------------------+
+|op_in_bytes | Client operations total write size |
++------------+------------------------------------+
+|op_r | Client read operations |
++------------+------------------------------------+
+|op_out_bytes| Client operations total read size |
++------------+------------------------------------+
+
+
+**cluster**
+The cluster will collect the same type of data as the osd by default but instead of collecting per osd, it will sum up the performance counter
+for all osd.
+
+^^^^^^^^
+extended
+^^^^^^^^
+There are many other counters that can be collected by configuring the module such as operational counters and suboperational counters. A couple of counters are listed and described below, but additional counters
+can be found here https://github.com/ceph/ceph/blob/5a197c5817f591fc514f55b9929982e90d90084e/src/osd/OSD.cc
+
+**Operations**
+
+- Latency counters are measured in microseconds unless otherwise specified in the description.
+
++------------------------+--------------------------------------------------------------------------+
+|Counter | Description |
++========================+==========================================================================+
+|op_wip | Replication operations currently being processed (primary) |
++------------------------+--------------------------------------------------------------------------+
+|op_latency | Latency of client operations (including queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_process_latency | Latency of client operations (excluding queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_prepare_latency | Latency of client operations (excluding queue time and wait for finished)|
++------------------------+--------------------------------------------------------------------------+
+|op_r_latency | Latency of read operation (including queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_r_process_latency | Latency of read operation (excluding queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_w_in_bytes | Client data written |
++------------------------+--------------------------------------------------------------------------+
+|op_w_latency | Latency of write operation (including queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_w_process_latency | Latency of write operation (excluding queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_w_prepare_latency | Latency of write operations (excluding queue time and wait for finished) |
++------------------------+--------------------------------------------------------------------------+
+|op_rw | Client read-modify-write operations |
++------------------------+--------------------------------------------------------------------------+
+|op_rw_in_bytes | Client read-modify-write operations write in |
++------------------------+--------------------------------------------------------------------------+
+|op_rw_out_bytes | Client read-modify-write operations read out |
++------------------------+--------------------------------------------------------------------------+
+|op_rw_latency | Latency of read-modify-write operation (including queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_rw_process_latency | Latency of read-modify-write operation (excluding queue time) |
++------------------------+--------------------------------------------------------------------------+
+|op_rw_prepare_latency | Latency of read-modify-write operations (excluding queue time |
+| | and wait for finished) |
++------------------------+--------------------------------------------------------------------------+
+|op_before_queue_op_lat | Latency of IO before calling queue (before really queue into ShardedOpWq)|
+| | op_before_dequeue_op_lat |
++------------------------+--------------------------------------------------------------------------+
+|op_before_dequeue_op_lat| Latency of IO before calling dequeue_op(already dequeued and get PG lock)|
++------------------------+--------------------------------------------------------------------------+
--- /dev/null
+from collections import defaultdict
+from datetime import datetime
+import json
+import sys
+from threading import Event
+import time
+from ConfigParser import SafeConfigParser
+from influxdb import InfluxDBClient
+from mgr_module import MgrModule
+
+class Module(MgrModule):
+
+ COMMANDS = [
+ {
+ "cmd": "influx self-test",
+ "desc": "debug the module",
+ "perm": "rw"
+ },
+ ]
+
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.event = Event()
+ self.run = True
+
+
+ def get_latest(self, daemon_type, daemon_name, stat):
+ data = self.get_counter(daemon_type, daemon_name, stat)[stat]
+ if data:
+ return data[-1][1]
+ else:
+ return 0
+
+
+ def get_df_stats(self):
+ df = self.get("df")
+ data = []
+
+ df_types = [
+ 'bytes_used',
+ 'dirty',
+ 'rd_bytes',
+ 'raw_bytes_used',
+ 'wr_bytes',
+ 'objects',
+ 'max_avail'
+ ]
+
+ for df_type in df_types:
+ for pool in df['pools']:
+ point = {
+ "measurement": "ceph_pool_stats",
+ "tags": {
+ "pool_name" : pool['name'],
+ "pool_id" : pool['id'],
+ "type_instance" : df_type,
+ "mgr_id" : self.get_mgr_id(),
+ },
+ "time" : datetime.utcnow().isoformat() + 'Z',
+ "fields": {
+ "value" : pool['stats'][df_type],
+ }
+ }
+ data.append(point)
+ return data
+
+
+ def get_default_stat(self):
+ defaults= [
+ "op_w",
+ "op_in_bytes",
+ "op_r",
+ "op_out_bytes"
+ ]
+
+ osd_data = []
+ cluster_data = []
+ for default in defaults:
+ osdmap = self.get("osd_map")['osds']
+ value = 0
+ for osd in osdmap:
+ osd_id = osd['osd']
+ metadata = self.get_metadata('osd', "%s" % osd_id)
+ value += self.get_latest("osd", str(osd_id), "osd."+ str(default))
+ point = {
+ "measurement": "ceph_osd_stats",
+ "tags": {
+ "mgr_id": self.get_mgr_id(),
+ "osd_id": osd_id,
+ "type_instance": default,
+ "host": metadata['hostname']
+ },
+ "time" : datetime.utcnow().isoformat() + 'Z',
+ "fields" : {
+ "value": self.get_latest("osd", osd_id.__str__(), "osd."+ default.__str__())
+ }
+ }
+ osd_data.append(point)
+ point2 = {
+ "measurement": "ceph_cluster_stats",
+ "tags": {
+ "mgr_id": self.get_mgr_id(),
+ "type_instance": default,
+ },
+ "time" : datetime.utcnow().isoformat() + 'Z',
+ "fields" : {
+ "value": value
+ }
+ }
+ cluster_data.append(point2)
+ return osd_data, cluster_data
+
+
+
+ def get_extended(self, counter_type, type_inst):
+ path = "osd." + type_inst.__str__()
+ osdmap = self.get("osd_map")
+ data = []
+ value = 0
+ for osd in osdmap['osds']:
+ osd_id = osd['osd']
+ metadata = self.get_metadata('osd', "%s" % osd_id)
+ value += self.get_latest("osd", osd_id.__str__(), path.__str__())
+ point = {
+ "measurement": "ceph_osd_stats",
+ "tags": {
+ "mgr_id": self.get_mgr_id(),
+ "osd_id": osd_id,
+ "type_instance": type_inst,
+ "host": metadata['hostname']
+ },
+ "time" : datetime.utcnow().isoformat() + 'Z',
+ "fields" : {
+ "value": self.get_latest("osd", osd_id.__str__(), path.__str__())
+ }
+ }
+ data.append(point)
+ if counter_type == "cluster":
+ point = [{
+ "measurement": "ceph_cluster_stats",
+ "tags": {
+ "mgr_id": self.get_mgr_id(),
+ "type_instance": type_inst,
+ },
+ "time" : datetime.utcnow().isoformat() + 'Z',
+ "fields" : {
+ "value": value
+ }
+ }]
+ return point
+ else:
+ return data
+
+ def send_to_influx(self):
+ config = SafeConfigParser()
+ config.read('/etc/ceph/influx.conf')
+ host = config.get('influx','hostname')
+ username = config.get('influx', 'username')
+ password = config.get('influx', 'password')
+ database = config.get('influx', 'database')
+ port = int(config.get('influx','port'))
+ stats = config.get('influx', 'stats').replace(' ', '').split(',')
+ client = InfluxDBClient(host, port, username, password, database)
+ databases_avail = client.get_list_database()
+ default_stats = self.get_default_stat()
+ for database_avail in databases_avail:
+ if database_avail == database:
+ break
+ else:
+ client.create_database(database)
+
+
+
+ for stat in stats:
+ if stat == "pool":
+ client.write_points(self.get_df_stats(), 'ms')
+
+ elif stat == "osd":
+ client.write_points(default_stats[0], 'ms')
+ if config.has_option('extended', 'osd'):
+ osds = config.get('extended', 'osd').replace(' ', '').split(',')
+ for osd in osds:
+ client.write_points(self.get_extended("osd", osd), 'ms')
+ self.log.debug("wrote osd stats")
+
+ elif stat == "cluster":
+ client.write_points(default_stats[-1], 'ms')
+ if config.has_option('extended', 'cluster'):
+ clusters = config.get('extended', 'cluster').replace(' ', '').split(',')
+ for cluster in clusters:
+ client.write_points(self.get_extended("cluster", cluster), 'ms')
+ self.log.debug("wrote cluster stats")
+ else:
+ self.log.error("invalid stat")
+
+ def shutdown(self):
+ self.log.info('Stopping influx module')
+ self.run = False
+ self.event.set()
+
+ def handle_command(self, cmd):
+ if cmd['prefix'] == 'influx self-test':
+ self.send_to_influx()
+ return 0,' ', 'debugging module'
+ else:
+ print('not found')
+ raise NotImplementedError(cmd['prefix'])
+
+ def serve(self):
+ self.log.info('Starting influx module')
+ self.run = True
+ config = SafeConfigParser()
+ config.read('/etc/ceph/influx.conf')
+ while self.run:
+ self.send_to_influx()
+ self.log.debug("Running interval loop")
+ interval = int(config.get('influx','interval'))
+ self.log.debug("sleeping for %d seconds",interval)
+ self.event.wait(interval)
+
\ No newline at end of file