From 25e7d313316b8131cc9e3dbbbd4ce6959a92756b Mon Sep 17 00:00:00 2001 From: Wido den Hollander Date: Mon, 6 Aug 2018 11:45:13 +0200 Subject: [PATCH] mgr/influx: Use Queue to store points which need to be written This allows us to multiplex data being send to Influx as we have a configurable amount of workers sending data to Influx. The main bottleneck for the performance seems to be fetching all the perf counters using this code: self.get_all_perf_counters() On a larger cluster, for example 2000 OSDs this can take about 20s where flushing to Influx only takes 5s. A 2000 OSD cluster generates about 100k data points on every run, prior to using a Queue these would all be send to Influx in series in that took over 15 seconds to complete. Python Six is being used in the code to make sure it's compatible with both Python 2 and 3. Signed-off-by: Wido den Hollander --- doc/mgr/influx.rst | 4 +- src/pybind/mgr/influx/module.py | 254 ++++++++++++++++++++++---------- 2 files changed, 182 insertions(+), 76 deletions(-) diff --git a/doc/mgr/influx.rst b/doc/mgr/influx.rst index c9a58a831f4ad..7fefcff5e9d18 100644 --- a/doc/mgr/influx.rst +++ b/doc/mgr/influx.rst @@ -50,11 +50,13 @@ For example, a typical configuration might look like this: Additional optional configuration settings are: -:interval: Time between reports to InfluxDB. Default 5 seconds. +:interval: Time between reports to InfluxDB. Default 30 seconds. :database: InfluxDB database name. Default "ceph". You will need to create this database and grant write privileges to the configured username or the username must have admin privileges to create it. :port: InfluxDB server port. Default 8086 :ssl: Use https connection for InfluxDB server. Use "true" or "false". Default false :verify_ssl: Verify https cert for InfluxDB server. Use "true" or "false". Default true +:threads: How many worker threads should be spawned for sending data to InfluxDB. Default is 5 +:batch_size: How big batches of data points should be when sending to InfluxDB. Default is 5000 --------- Debugging diff --git a/src/pybind/mgr/influx/module.py b/src/pybind/mgr/influx/module.py index 278a298920deb..761c53ece5c75 100644 --- a/src/pybind/mgr/influx/module.py +++ b/src/pybind/mgr/influx/module.py @@ -1,5 +1,9 @@ from datetime import datetime -from threading import Event +from threading import Event, Thread +from itertools import chain +from six import next +from six.moves import queue +from six.moves import xrange as range import json import errno import six @@ -10,7 +14,7 @@ from mgr_module import MgrModule try: from influxdb import InfluxDBClient from influxdb.exceptions import InfluxDBClientError - from requests.exceptions import ConnectionError + from requests.exceptions import RequestException except ImportError: InfluxDBClient = None @@ -49,6 +53,14 @@ class Module(MgrModule): 'name': 'verify_ssl', 'default': 'true' }, + { + 'name': 'threads', + 'default': 5 + }, + { + 'name': 'batch_size', + 'default': 5000 + } ] @property @@ -80,6 +92,9 @@ class Module(MgrModule): self.event = Event() self.run = True self.config = dict() + self.workers = list() + self.queue = queue.Queue(maxsize=100) + self.health_checks = dict() def get_fsid(self): return self.get('mon_map')['fsid'] @@ -91,6 +106,65 @@ class Module(MgrModule): else: return False, "influxdb python module not found" + @staticmethod + def get_timestamp(): + return datetime.utcnow().isoformat() + 'Z' + + @staticmethod + def chunk(l, n): + try: + while True: + xs = [] + for _ in range(n): + xs.append(next(l)) + yield xs + except StopIteration: + yield xs + + def queue_worker(self): + while True: + try: + points = self.queue.get() + if points is None: + self.log.debug('Worker shutting down') + break + + start = time.time() + client = self.get_influx_client() + client.write_points(points, time_precision='ms') + client.close() + runtime = time.time() - start + self.log.debug('Writing points %d to Influx took %.3f seconds', + len(points), runtime) + except RequestException as e: + self.log.exception("Failed to connect to Influx host %s:%d", + self.config['hostname'], self.config['port']) + self.health_checks.update({ + 'MGR_INFLUX_SEND_FAILED': { + 'severity': 'warning', + 'summary': 'Failed to send data to InfluxDB server ' + 'at %s:%d due to an connection error' + % (self.config['hostname'], + self.config['port']), + 'detail': [str(e)] + } + }) + except InfluxDBClientError as e: + self.health_checks.update({ + 'MGR_INFLUX_SEND_FAILED': { + 'severity': 'warning', + 'summary': 'Failed to send data to InfluxDB', + 'detail': [str(e)] + } + }) + self.log.exception('Failed to send data to InfluxDB') + except queue.Empty: + continue + except: + self.log.exception('Unhandled Exception while sending to Influx') + finally: + self.queue.task_done() + def get_latest(self, daemon_type, daemon_name, stat): data = self.get_counter(daemon_type, daemon_name, stat)[stat] if data: @@ -98,13 +172,11 @@ class Module(MgrModule): return 0 - def get_df_stats(self): + def get_df_stats(self, now): df = self.get("df") data = [] pool_info = {} - now = datetime.utcnow().isoformat() + 'Z' - df_types = [ 'bytes_used', 'kb_used', @@ -139,54 +211,46 @@ class Module(MgrModule): pool_info.update({str(pool['id']):pool['name']}) return data, pool_info - def get_pg_summary(self, pool_info): - time = datetime.utcnow().isoformat() + 'Z' + def get_pg_summary_osd(self, pool_info, now): pg_sum = self.get('pg_summary') osd_sum = pg_sum['by_osd'] - pool_sum = pg_sum['by_pool'] - data = [] for osd_id, stats in six.iteritems(osd_sum): metadata = self.get_metadata('osd', "%s" % osd_id) if not metadata: continue for stat in stats: - point_1 = { + yield { "measurement": "ceph_pg_summary_osd", "tags": { "ceph_daemon": "osd." + str(osd_id), "type_instance": stat, "host": metadata['hostname'] }, - "time" : time, + "time" : now, "fields" : { "value": stats[stat] } } - data.append(point_1) + + def get_pg_summary_pool(self, pool_info, now): + pool_sum = self.get('pg_summary')['by_pool'] for pool_id, stats in six.iteritems(pool_sum): for stat in stats: - point_2 = { + yield { "measurement": "ceph_pg_summary_pool", "tags": { "pool_name" : pool_info[pool_id], "pool_id" : pool_id, "type_instance" : stat, }, - "time" : time, + "time" : now, "fields": { "value" : stats[stat], } } - data.append(point_2) - return data - - - def get_daemon_stats(self): - data = [] - - now = datetime.utcnow().isoformat() + 'Z' + def get_daemon_stats(self, now): for daemon, counters in six.iteritems(self.get_all_perf_counters()): svc_type, svc_id = daemon.split(".", 1) metadata = self.get_metadata(svc_type, svc_id) @@ -197,7 +261,7 @@ class Module(MgrModule): value = counter_info['value'] - data.append({ + yield { "measurement": "ceph_daemon_stats", "tags": { "ceph_daemon": daemon, @@ -209,16 +273,14 @@ class Module(MgrModule): "fields": { "value": value } - }) - - return data + } def set_config_option(self, option, value): if option not in self.config_keys.keys(): raise RuntimeError('{0} is a unknown configuration ' 'option'.format(option)) - if option in ['port', 'interval']: + if option in ['port', 'interval', 'threads', 'batch_size']: try: value = int(value) except (ValueError, TypeError): @@ -231,6 +293,10 @@ class Module(MgrModule): if option in ['ssl', 'verify_ssl']: value = value.lower() == 'true' + if option == 'threads': + if 1 > value > 32: + raise RuntimeError('threads should be in range 1-32') + self.config[option] = value def init_module_config(self): @@ -247,12 +313,34 @@ class Module(MgrModule): self.config['interval'] = \ int(self.get_config("interval", default=self.config_keys['interval'])) + self.config['threads'] = \ + int(self.get_config("threads", + default=self.config_keys['threads'])) + self.config['batch_size'] = \ + int(self.get_config("batch_size", + default=self.config_keys['batch_size'])) ssl = self.get_config("ssl", default=self.config_keys['ssl']) self.config['ssl'] = ssl.lower() == 'true' verify_ssl = \ self.get_config("verify_ssl", default=self.config_keys['verify_ssl']) self.config['verify_ssl'] = verify_ssl.lower() == 'true' + def gather_statistics(self): + now = self.get_timestamp() + df_stats, pools = self.get_df_stats(now) + return chain(df_stats, self.get_daemon_stats(now), + self.get_pg_summary_osd(pools, now), + self.get_pg_summary_pool(pools, now)) + + def get_influx_client(self): + return InfluxDBClient(self.config['hostname'], + self.config['port'], + self.config['username'], + self.config['password'], + self.config['database'], + self.config['ssl'], + self.config['verify_ssl']) + def send_to_influx(self): if not self.config['hostname']: self.log.error("No Influx server configured, please set one using: " @@ -265,70 +353,78 @@ class Module(MgrModule): 'detail': ['Configuration option hostname not set'] } }) - return + return False + + self.health_checks = dict() - # If influx server has authentication turned off then - # missing username/password is valid. self.log.debug("Sending data to Influx host: %s", self.config['hostname']) - client = InfluxDBClient(self.config['hostname'], self.config['port'], - self.config['username'], - self.config['password'], - self.config['database'], - self.config['ssl'], - self.config['verify_ssl']) - - # using influx client get_list_database requires admin privs, - # instead we'll catch the not found exception and inform the user if - # db can not be created try: - df_stats, pools = self.get_df_stats() - client.write_points(df_stats, 'ms') - client.write_points(self.get_daemon_stats(), 'ms') - client.write_points(self.get_pg_summary(pools)) - self.set_health_checks(dict()) - except ConnectionError as e: - self.log.exception("Failed to connect to Influx host %s:%d", - self.config['hostname'], self.config['port']) - self.set_health_checks({ - 'MGR_INFLUX_SEND_FAILED': { - 'severity': 'warning', - 'summary': 'Failed to send data to InfluxDB server at %s:%d' - ' due to an connection error' - % (self.config['hostname'], self.config['port']), - 'detail': [str(e)] - } - }) - except InfluxDBClientError as e: - if e.code == 404: + client = self.get_influx_client() + databases = client.get_list_database() + if {'name': self.config['database']} not in databases: self.log.info("Database '%s' not found, trying to create " - "(requires admin privs). You can also create " + "(requires admin privs). You can also create " "manually and grant write privs to user " "'%s'", self.config['database'], - self.config['username']) + self.config['database']) client.create_database(self.config['database']) - client.create_retention_policy(name='8_weeks', duration='8w', - replication='1', default=True, + client.create_retention_policy(name='8_weeks', + duration='8w', + replication='1', + default=True, database=self.config['database']) - else: - self.set_health_checks({ - 'MGR_INFLUX_SEND_FAILED': { - 'severity': 'warning', - 'summary': 'Failed to send data to InfluxDB', - 'detail': [str(e)] - } - }) - raise + client.close() + + self.log.debug('Gathering statistics') + points = self.gather_statistics() + for chunk in self.chunk(points, self.config['batch_size']): + self.queue.put(chunk, block=False) + + self.log.debug('Queue currently contains %d items', + self.queue.qsize()) + except queue.Full: + self.health_checks.update({ + 'MGR_INFLUX_QUEUE_FULL': { + 'severity': 'warning', + 'summary': 'Failed to chunk to InfluxDB Queue', + 'detail': ['Queue is full. InfluxDB might be slow with ' + 'processing data'] + } + }) + self.log.error('Queue is full, failed to add chunk') + except (RequestException, InfluxDBClientError) as e: + self.health_checks.update({ + 'MGR_INFLUX_DB_LIST_FAILED': { + 'severity': 'warning', + 'summary': 'Failed to list/create InfluxDB database', + 'detail': [str(e)] + } + }) + self.log.exception('Failed to list/create InfluxDB database') + return False + finally: + self.set_health_checks(self.health_checks) def shutdown(self): self.log.info('Stopping influx module') self.run = False self.event.set() + self.log.debug('Shutting down queue workers') + + for _ in self.workers: + self.queue.put(None) + + self.queue.join() + + for worker in self.workers: + worker.join() def self_test(self): - daemon_stats = self.get_daemon_stats() + now = self.get_timestamp() + daemon_stats = list(self.get_daemon_stats(now)) assert len(daemon_stats) - df_stats, pools = self.get_df_stats() + df_stats, pools = self.get_df_stats(now) result = { 'daemon_stats': daemon_stats, @@ -367,11 +463,19 @@ class Module(MgrModule): self.init_module_config() self.run = True + self.log.debug('Starting %d queue worker threads', + self.config['threads']) + for i in range(self.config['threads']): + worker = Thread(target=self.queue_worker, args=()) + worker.setDaemon(True) + worker.start() + self.workers.append(worker) + while self.run: start = time.time() self.send_to_influx() runtime = time.time() - start - self.log.debug('Finished sending data in Influx in %.3f seconds', + self.log.debug('Finished sending data to Influx in %.3f seconds', runtime) self.log.debug("Sleeping for %d seconds", self.config['interval']) self.event.wait(self.config['interval']) -- 2.39.5