]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/telegraf: Telegraf module for Ceph Mgr 21782/head
authorWido den Hollander <wido@42on.com>
Mon, 26 Mar 2018 11:47:45 +0000 (13:47 +0200)
committerWido den Hollander <wido@42on.com>
Wed, 9 May 2018 14:00:15 +0000 (16:00 +0200)
Telegraf is a agent for collecting and reporting metrics.

It has multiple inputs and can send data to various outputs like
for example InfluxDB or ElasticSearch.

This module works by using the socket_listener of Telegraf and can
send data over UDP, TCP and a local Unix Socket.

Signed-off-by: Wido den Hollander <wido@42on.com>
PendingReleaseNotes
doc/mgr/index.rst
doc/mgr/telegraf.rst [new file with mode: 0644]
qa/tasks/mgr/test_module_selftest.py
src/pybind/mgr/telegraf/__init__.py [new file with mode: 0644]
src/pybind/mgr/telegraf/basesocket.py [new file with mode: 0644]
src/pybind/mgr/telegraf/module.py [new file with mode: 0644]
src/pybind/mgr/telegraf/protocol.py [new file with mode: 0644]
src/pybind/mgr/telegraf/utils.py [new file with mode: 0644]

index 3e3d34faa376a20abffcef5a1f9763bc83422b95..d98903ef503622eab9d25de3ea4e872360e87ca0 100644 (file)
@@ -223,3 +223,8 @@ for details.
   is permanent and instructs the cluster to proceed with an empty PG
   in its place, without making any further efforts to find the missing
   data.
+
+* The Telegraf module for the Manager allows for sending statistics to
+  an Telegraf Agent over TCP, UDP or a UNIX Socket. Telegraf can then
+  send the statistics to databases like InfluxDB, ElasticSearch, Graphite
+  and many more.
index 21fe2bed9927600d8732cde12118210b8e47887a..d028bda2d636ff34b7ccbdc4daedc9dbffcf27fa 100644 (file)
@@ -35,4 +35,4 @@ sensible.
     Prometheus plugin <prometheus>
     Influx plugin <influx>
     Hello plugin <hello>
-
+    Telegraf plugin <telegraf>
diff --git a/doc/mgr/telegraf.rst b/doc/mgr/telegraf.rst
new file mode 100644 (file)
index 0000000..6b629e2
--- /dev/null
@@ -0,0 +1,88 @@
+===============
+Telegraf Plugin 
+===============
+The Telegraf plugin collects and sends statistics series to a Telegraf agent.
+
+The Telegraf agent can buffer, aggegrate, parse and process the data before
+sending it to an output which can be InfluxDB, ElasticSearch and many more.
+
+Currently the only way to send statistics to Telegraf from this module is to
+use the socket listener. The module can send statistics over UDP, TCP or
+a UNIX socket.
+
+The Telegraf plugin was introduced in the 13.x *Mimic* release.
+
+--------
+Enabling 
+--------
+
+To enable the module, use the following command:
+
+::
+
+    ceph mgr module enable telegraf
+
+If you wish to subsequently disable the module, you can use the corresponding
+*disable* command:
+
+::
+
+    ceph mgr module disable telegraf
+
+-------------
+Configuration 
+-------------
+
+For the telegraf module to send statistics to a Telegraf agent it is
+required to configure the address to send the statistics to.
+
+Set configuration values using the following command:
+
+::
+
+    ceph telegraf config-set <key> <value>
+
+
+The most important settings are ``address`` and ``interval``.
+
+For example, a typical configuration might look like this:
+
+::
+
+    ceph telegraf config-set address udp://:8094
+    ceph telegraf config-set interval 10
+    
+The default values for these configuration keys are:
+
+- address: unixgram:///tmp/telegraf.sock
+- interval: 15
+
+----------------
+Socket Listener
+----------------
+The module only supports sending data to Telegraf through the socket listener
+of the Telegraf module using the Influx data format.
+
+A typical Telegraf configuration might be:
+
+
+    [[inputs.socket_listener]]
+    # service_address = "tcp://:8094"
+    # service_address = "tcp://127.0.0.1:http"
+    # service_address = "tcp4://:8094"
+    # service_address = "tcp6://:8094"
+    # service_address = "tcp6://[2001:db8::1]:8094"
+    service_address = "udp://:8094"
+    # service_address = "udp4://:8094"
+    # service_address = "udp6://:8094"
+    # service_address = "unix:///tmp/telegraf.sock"
+    # service_address = "unixgram:///tmp/telegraf.sock"
+    data_format = "influx"
+
+In this case the `address` configuration option for the module would need to be set
+to:
+
+  udp://:8094
+
+
+Refer to the Telegraf documentation for more configuration options.
index d13f67920431d8100a9392557a6defb28825dbaa..2239432b59a348047f373f73e21f4d41f110e2fd 100644 (file)
@@ -45,6 +45,9 @@ class TestModuleSelftest(MgrTestCase):
     def test_influx(self):
         self._selftest_plugin("influx")
 
+    def test_telegraf(self):
+        self._selftest_plugin("telegraf")
+
     def test_iostat(self):
         self._selftest_plugin("iostat")
 
diff --git a/src/pybind/mgr/telegraf/__init__.py b/src/pybind/mgr/telegraf/__init__.py
new file mode 100644 (file)
index 0000000..8f210ac
--- /dev/null
@@ -0,0 +1 @@
+from .module import Module
diff --git a/src/pybind/mgr/telegraf/basesocket.py b/src/pybind/mgr/telegraf/basesocket.py
new file mode 100644 (file)
index 0000000..4359946
--- /dev/null
@@ -0,0 +1,45 @@
+import socket
+
+
+class BaseSocket(object):
+    schemes = {
+        'unixgram': (socket.AF_UNIX, socket.SOCK_DGRAM),
+        'unix': (socket.AF_UNIX, socket.SOCK_STREAM),
+        'tcp': (socket.AF_INET, socket.SOCK_STREAM),
+        'tcp6': (socket.AF_INET6, socket.SOCK_STREAM),
+        'udp': (socket.AF_INET, socket.SOCK_DGRAM),
+        'udp6': (socket.AF_INET6, socket.SOCK_DGRAM),
+    }
+
+    def __init__(self, url):
+        self.url = url
+
+        try:
+            socket_family, socket_type = self.schemes[self.url.scheme]
+        except KeyError:
+            raise RuntimeError('Unsupported socket type: %s', self.url.scheme)
+
+        self.sock = socket.socket(family=socket_family, type=socket_type)
+        if self.sock.family == socket.AF_UNIX:
+            self.address = self.url.path
+        else:
+            self.address = (self.url.hostname, self.url.port)
+
+    def connect(self):
+        return self.sock.connect(self.address)
+
+    def close(self):
+        self.sock.close()
+
+    def send(self, data, flags=0):
+        return self.sock.send(data.encode('utf-8') + b'\n', flags)
+
+    def __del__(self):
+        self.sock.close()
+
+    def __enter__(self):
+        self.connect()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
diff --git a/src/pybind/mgr/telegraf/module.py b/src/pybind/mgr/telegraf/module.py
new file mode 100644 (file)
index 0000000..32d5a9f
--- /dev/null
@@ -0,0 +1,313 @@
+import errno
+import json
+import socket
+import time
+from threading import Event
+
+from telegraf.basesocket import BaseSocket
+from telegraf.protocol import Line
+from mgr_module import MgrModule
+
+try:
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
+
+
+class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "telegraf config-set name=key,type=CephString "
+                   "name=value,type=CephString",
+            "desc": "Set a configuration value",
+            "perm": "rw"
+        },
+        {
+            "cmd": "telegraf config-show",
+            "desc": "Show current configuration",
+            "perm": "r"
+        },
+        {
+            "cmd": "telegraf send",
+            "desc": "Force sending data to Telegraf",
+            "perm": "rw"
+        },
+        {
+            "cmd": "telegraf self-test",
+            "desc": "debug the module",
+            "perm": "rw"
+        },
+    ]
+
+    OPTIONS = [
+        {
+            'name': 'address',
+            'default': 'unixgram:///tmp/telegraf.sock',
+        },
+        {
+            'name': 'interval',
+            'default': 15
+        }
+    ]
+
+    ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
+
+    @property
+    def config_keys(self):
+        return dict((o['name'], o.get('default', None)) for o in self.OPTIONS)
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        self.event = Event()
+        self.run = True
+        self.fsid = None
+        self.config = dict()
+
+    def get_fsid(self):
+        if not self.fsid:
+            self.fsid = self.get('mon_map')['fsid']
+
+        return self.fsid
+
+    def get_pool_stats(self):
+        df = self.get('df')
+        data = []
+
+        df_types = [
+            'bytes_used',
+            'kb_used',
+            'dirty',
+            'rd',
+            'rd_bytes',
+            'raw_bytes_used',
+            'wr',
+            'wr_bytes',
+            'objects',
+            'max_avail',
+            'quota_objects',
+            'quota_bytes'
+        ]
+
+        for df_type in df_types:
+            for pool in df['pools']:
+                point = {
+                    'measurement': 'ceph_pool_stats',
+                    'tags': {
+                        'pool_name': pool['name'],
+                        'pool_id': pool['id'],
+                        'type_instance': df_type,
+                        'fsid': self.get_fsid()
+                    },
+                    'value': pool['stats'][df_type],
+                }
+                data.append(point)
+        return data
+
+    def get_daemon_stats(self):
+        data = []
+
+        for daemon, counters in self.get_all_perf_counters().iteritems():
+            svc_type, svc_id = daemon.split('.', 1)
+            metadata = self.get_metadata(svc_type, svc_id)
+
+            for path, counter_info in counters.items():
+                if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
+                    continue
+
+                data.append({
+                    'measurement': 'ceph_daemon_stats',
+                    'tags': {
+                        'ceph_daemon': daemon,
+                        'type_instance': path,
+                        'host': metadata['hostname'],
+                        'fsid': self.get_fsid()
+                    },
+                    'value': counter_info['value']
+                })
+
+        return data
+
+    def get_cluster_stats(self):
+        stats = dict()
+
+        health = json.loads(self.get('health')['json'])
+        stats['health'] = self.ceph_health_mapping.get(health['status'])
+
+        mon_status = json.loads(self.get('mon_status')['json'])
+        stats['num_mon'] = len(mon_status['monmap']['mons'])
+
+        stats['mon_election_epoch'] = mon_status['election_epoch']
+        stats['mon_outside_quorum'] = len(mon_status['outside_quorum'])
+        stats['mon_quorum'] = len(mon_status['quorum'])
+
+        osd_map = self.get('osd_map')
+        stats['num_osd'] = len(osd_map['osds'])
+        stats['num_pg_temp'] = len(osd_map['pg_temp'])
+        stats['osd_epoch'] = osd_map['epoch']
+
+        mgr_map = self.get('mgr_map')
+        stats['mgr_available'] = int(mgr_map['available'])
+        stats['num_mgr_standby'] = len(mgr_map['standbys'])
+        stats['mgr_epoch'] = mgr_map['epoch']
+
+        num_up = 0
+        num_in = 0
+        for osd in osd_map['osds']:
+            if osd['up'] == 1:
+                num_up += 1
+
+            if osd['in'] == 1:
+                num_in += 1
+
+        stats['num_osd_up'] = num_up
+        stats['num_osd_in'] = num_in
+
+        fs_map = self.get('fs_map')
+        stats['num_mds_standby'] = len(fs_map['standbys'])
+        stats['num_fs'] = len(fs_map['filesystems'])
+        stats['mds_epoch'] = fs_map['epoch']
+
+        num_mds_up = 0
+        for fs in fs_map['filesystems']:
+            num_mds_up += len(fs['mdsmap']['up'])
+
+        stats['num_mds_up'] = num_mds_up
+        stats['num_mds'] = num_mds_up + stats['num_mds_standby']
+
+        pg_status = self.get('pg_status')
+        for key in ['bytes_total', 'data_bytes', 'bytes_used', 'bytes_avail',
+                    'num_pgs', 'num_objects', 'num_pools']:
+            stats[key] = pg_status[key]
+
+        stats['num_pgs_active'] = 0
+        stats['num_pgs_clean'] = 0
+        stats['num_pgs_scrubbing'] = 0
+        stats['num_pgs_peering'] = 0
+        for state in pg_status['pgs_by_state']:
+            states = state['state_name'].split('+')
+
+            if 'active' in states:
+                stats['num_pgs_active'] += state['count']
+
+            if 'clean' in states:
+                stats['num_pgs_clean'] += state['count']
+
+            if 'peering' in states:
+                stats['num_pgs_peering'] += state['count']
+
+            if 'scrubbing' in states:
+                stats['num_pgs_scrubbing'] += state['count']
+
+        data = list()
+        for key, value in stats.items():
+            data.append({
+                'measurement': 'ceph_cluster_stats',
+                'tags': {
+                    'type_instance': key,
+                    'fsid': self.get_fsid()
+                },
+                'value': int(value)
+            })
+
+        return data
+
+    def set_config_option(self, option, value):
+        if option not in self.config_keys.keys():
+            raise RuntimeError('{0} is a unknown configuration '
+                               'option'.format(option))
+
+        if option in ['interval']:
+            try:
+                value = int(value)
+            except (ValueError, TypeError):
+                raise RuntimeError('invalid {0} configured. Please specify '
+                                   'a valid integer'.format(option))
+
+        if option == 'interval' and value < 5:
+            raise RuntimeError('interval should be set to at least 5 seconds')
+
+        self.config[option] = value
+
+    def init_module_config(self):
+        self.config['address'] = \
+            self.get_config("address", default=self.config_keys['address'])
+        self.config['interval'] = \
+            int(self.get_config("interval",
+                                default=self.config_keys['interval']))
+
+    def now(self):
+        return int(round(time.time() * 1000000000))
+
+    def gather_measurements(self):
+        measurements = list()
+        measurements += self.get_pool_stats()
+        measurements += self.get_daemon_stats()
+        measurements += self.get_cluster_stats()
+        return measurements
+
+    def send_to_telegraf(self):
+        url = urlparse(self.config['address'])
+
+        sock = BaseSocket(url)
+        self.log.debug('Sending data to Telegraf at %s', sock.address)
+        now = self.now()
+        with sock as s:
+            try:
+                for measurement in self.gather_measurements():
+                    self.log.debug(measurement)
+                    line = Line(measurement['measurement'],
+                                measurement['value'],
+                                measurement['tags'], now)
+                    self.log.debug(line.to_line_protocol())
+                    s.send(line.to_line_protocol())
+            except (socket.error, RuntimeError, errno, IOError):
+                self.log.exception('Failed to send statistics to Telegraf:')
+
+    def shutdown(self):
+        self.log.info('Stopping Telegraf module')
+        self.run = False
+        self.event.set()
+
+    def handle_command(self, cmd):
+        if cmd['prefix'] == 'telegraf config-show':
+            return 0, json.dumps(self.config), ''
+        elif cmd['prefix'] == 'telegraf config-set':
+            key = cmd['key']
+            value = cmd['value']
+            if not value:
+                return -errno.EINVAL, '', 'Value should not be empty or None'
+
+            self.log.debug('Setting configuration option %s to %s', key, value)
+            self.set_config_option(key, value)
+            self.set_config(key, value)
+            return 0, 'Configuration option {0} updated'.format(key), ''
+        elif cmd['prefix'] == 'telegraf send':
+            self.send_to_telegraf()
+            return 0, 'Sending data to Telegraf', ''
+        if cmd['prefix'] == 'telegraf self-test':
+            self.self_test()
+            return 0, '', 'Self-test OK'
+
+        return (-errno.EINVAL, '',
+                "Command not found '{0}'".format(cmd['prefix']))
+
+    def self_test(self):
+        measurements = self.gather_measurements()
+        if len(measurements) == 0:
+            raise RuntimeError('No measurements found')
+
+    def serve(self):
+        self.log.info('Starting Telegraf module')
+        self.init_module_config()
+        self.run = True
+
+        self.log.debug('Waiting 10 seconds before starting')
+        self.event.wait(10)
+
+        while self.run:
+            start = self.now()
+            self.send_to_telegraf()
+            runtime = (self.now() - start) / 1000000
+            self.log.debug('Sending data to Telegraf took %d ms', runtime)
+            self.log.debug("Sleeping for %d seconds", self.config['interval'])
+            self.event.wait(self.config['interval'])
diff --git a/src/pybind/mgr/telegraf/protocol.py b/src/pybind/mgr/telegraf/protocol.py
new file mode 100644 (file)
index 0000000..d243e0c
--- /dev/null
@@ -0,0 +1,44 @@
+from telegraf.utils import format_string, format_value
+
+
+class Line(object):
+    def __init__(self, measurement, values, tags=None, timestamp=None):
+        self.measurement = measurement
+        self.values = values
+        self.tags = tags
+        self.timestamp = timestamp
+
+    def get_output_measurement(self):
+        return format_string(self.measurement)
+
+    def get_output_values(self):
+        if not isinstance(self.values, dict):
+            metric_values = {'value': self.values}
+        else:
+            metric_values = self.values
+
+        sorted_values = sorted(metric_values.items())
+        sorted_values = [(k, v) for k, v in sorted_values if v is not None]
+
+        return u','.join(u'{0}={1}'.format(format_string(k), format_value(v)) for k, v in sorted_values)
+
+    def get_output_tags(self):
+        if not self.tags:
+            self.tags = dict()
+
+        sorted_tags = sorted(self.tags.items())
+
+        return u','.join(u'{0}={1}'.format(format_string(k), format_string(v)) for k, v in sorted_tags)
+
+    def get_output_timestamp(self):
+        return ' {0}'.format(self.timestamp) if self.timestamp else ''
+
+    def to_line_protocol(self):
+        tags = self.get_output_tags()
+
+        return u'{0}{1} {2}{3}'.format(
+            self.get_output_measurement(),
+            "," + tags if tags else '',
+            self.get_output_values(),
+            self.get_output_timestamp()
+        )
diff --git a/src/pybind/mgr/telegraf/utils.py b/src/pybind/mgr/telegraf/utils.py
new file mode 100644 (file)
index 0000000..4c7fd1c
--- /dev/null
@@ -0,0 +1,20 @@
+def format_string(key):
+    if isinstance(key, str):
+        key = key.replace(',', r'\,')
+        key = key.replace(' ', r'\ ')
+        key = key.replace('=', r'\=')
+    return key
+
+
+def format_value(value):
+    if isinstance(value, str):
+        value = value.replace('"', '\"')
+        value = u'"{0}"'.format(value)
+    elif isinstance(value, bool):
+        value = str(value)
+    elif isinstance(value, int):
+        value = "{0}i".format(value)
+    elif isinstance(value, float):
+        value = str(value)
+    return value
+