from datetime import datetime
-from threading import Event
+from threading import Event, Thread
+from itertools import chain
+from six import next
+from six.moves import queue
+from six.moves import xrange as range
import json
import errno
import six
try:
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
- from requests.exceptions import ConnectionError
+ from requests.exceptions import RequestException
except ImportError:
InfluxDBClient = None
'name': 'verify_ssl',
'default': 'true'
},
+ {
+ 'name': 'threads',
+ 'default': 5
+ },
+ {
+ 'name': 'batch_size',
+ 'default': 5000
+ }
]
@property
self.event = Event()
self.run = True
self.config = dict()
+ self.workers = list()
+ self.queue = queue.Queue(maxsize=100)
+ self.health_checks = dict()
def get_fsid(self):
return self.get('mon_map')['fsid']
else:
return False, "influxdb python module not found"
+ @staticmethod
+ def get_timestamp():
+ return datetime.utcnow().isoformat() + 'Z'
+
+ @staticmethod
+ def chunk(l, n):
+ try:
+ while True:
+ xs = []
+ for _ in range(n):
+ xs.append(next(l))
+ yield xs
+ except StopIteration:
+ yield xs
+
+ def queue_worker(self):
+ while True:
+ try:
+ points = self.queue.get()
+ if points is None:
+ self.log.debug('Worker shutting down')
+ break
+
+ start = time.time()
+ client = self.get_influx_client()
+ client.write_points(points, time_precision='ms')
+ client.close()
+ runtime = time.time() - start
+ self.log.debug('Writing points %d to Influx took %.3f seconds',
+ len(points), runtime)
+ except RequestException as e:
+ self.log.exception("Failed to connect to Influx host %s:%d",
+ self.config['hostname'], self.config['port'])
+ self.health_checks.update({
+ 'MGR_INFLUX_SEND_FAILED': {
+ 'severity': 'warning',
+ 'summary': 'Failed to send data to InfluxDB server '
+ 'at %s:%d due to an connection error'
+ % (self.config['hostname'],
+ self.config['port']),
+ 'detail': [str(e)]
+ }
+ })
+ except InfluxDBClientError as e:
+ self.health_checks.update({
+ 'MGR_INFLUX_SEND_FAILED': {
+ 'severity': 'warning',
+ 'summary': 'Failed to send data to InfluxDB',
+ 'detail': [str(e)]
+ }
+ })
+ self.log.exception('Failed to send data to InfluxDB')
+ except queue.Empty:
+ continue
+ except:
+ self.log.exception('Unhandled Exception while sending to Influx')
+ finally:
+ self.queue.task_done()
+
def get_latest(self, daemon_type, daemon_name, stat):
data = self.get_counter(daemon_type, daemon_name, stat)[stat]
if data:
return 0
- def get_df_stats(self):
+ def get_df_stats(self, now):
df = self.get("df")
data = []
pool_info = {}
- now = datetime.utcnow().isoformat() + 'Z'
-
df_types = [
'bytes_used',
'kb_used',
pool_info.update({str(pool['id']):pool['name']})
return data, pool_info
- def get_pg_summary(self, pool_info):
- time = datetime.utcnow().isoformat() + 'Z'
+ def get_pg_summary_osd(self, pool_info, now):
pg_sum = self.get('pg_summary')
osd_sum = pg_sum['by_osd']
- pool_sum = pg_sum['by_pool']
- data = []
for osd_id, stats in six.iteritems(osd_sum):
metadata = self.get_metadata('osd', "%s" % osd_id)
if not metadata:
continue
for stat in stats:
- point_1 = {
+ yield {
"measurement": "ceph_pg_summary_osd",
"tags": {
"ceph_daemon": "osd." + str(osd_id),
"type_instance": stat,
"host": metadata['hostname']
},
- "time" : time,
+ "time" : now,
"fields" : {
"value": stats[stat]
}
}
- data.append(point_1)
+
+ def get_pg_summary_pool(self, pool_info, now):
+ pool_sum = self.get('pg_summary')['by_pool']
for pool_id, stats in six.iteritems(pool_sum):
for stat in stats:
- point_2 = {
+ yield {
"measurement": "ceph_pg_summary_pool",
"tags": {
"pool_name" : pool_info[pool_id],
"pool_id" : pool_id,
"type_instance" : stat,
},
- "time" : time,
+ "time" : now,
"fields": {
"value" : stats[stat],
}
}
- data.append(point_2)
- return data
-
-
- def get_daemon_stats(self):
- data = []
-
- now = datetime.utcnow().isoformat() + 'Z'
+ def get_daemon_stats(self, now):
for daemon, counters in six.iteritems(self.get_all_perf_counters()):
svc_type, svc_id = daemon.split(".", 1)
metadata = self.get_metadata(svc_type, svc_id)
value = counter_info['value']
- data.append({
+ yield {
"measurement": "ceph_daemon_stats",
"tags": {
"ceph_daemon": daemon,
"fields": {
"value": value
}
- })
-
- return data
+ }
def set_config_option(self, option, value):
if option not in self.config_keys.keys():
raise RuntimeError('{0} is a unknown configuration '
'option'.format(option))
- if option in ['port', 'interval']:
+ if option in ['port', 'interval', 'threads', 'batch_size']:
try:
value = int(value)
except (ValueError, TypeError):
if option in ['ssl', 'verify_ssl']:
value = value.lower() == 'true'
+ if option == 'threads':
+ if 1 > value > 32:
+ raise RuntimeError('threads should be in range 1-32')
+
self.config[option] = value
def init_module_config(self):
self.config['interval'] = \
int(self.get_config("interval",
default=self.config_keys['interval']))
+ self.config['threads'] = \
+ int(self.get_config("threads",
+ default=self.config_keys['threads']))
+ self.config['batch_size'] = \
+ int(self.get_config("batch_size",
+ default=self.config_keys['batch_size']))
ssl = self.get_config("ssl", default=self.config_keys['ssl'])
self.config['ssl'] = ssl.lower() == 'true'
verify_ssl = \
self.get_config("verify_ssl", default=self.config_keys['verify_ssl'])
self.config['verify_ssl'] = verify_ssl.lower() == 'true'
+ def gather_statistics(self):
+ now = self.get_timestamp()
+ df_stats, pools = self.get_df_stats(now)
+ return chain(df_stats, self.get_daemon_stats(now),
+ self.get_pg_summary_osd(pools, now),
+ self.get_pg_summary_pool(pools, now))
+
+ def get_influx_client(self):
+ return InfluxDBClient(self.config['hostname'],
+ self.config['port'],
+ self.config['username'],
+ self.config['password'],
+ self.config['database'],
+ self.config['ssl'],
+ self.config['verify_ssl'])
+
def send_to_influx(self):
if not self.config['hostname']:
self.log.error("No Influx server configured, please set one using: "
'detail': ['Configuration option hostname not set']
}
})
- return
+ return False
+
+ self.health_checks = dict()
- # If influx server has authentication turned off then
- # missing username/password is valid.
self.log.debug("Sending data to Influx host: %s",
self.config['hostname'])
- client = InfluxDBClient(self.config['hostname'], self.config['port'],
- self.config['username'],
- self.config['password'],
- self.config['database'],
- self.config['ssl'],
- self.config['verify_ssl'])
-
- # using influx client get_list_database requires admin privs,
- # instead we'll catch the not found exception and inform the user if
- # db can not be created
try:
- df_stats, pools = self.get_df_stats()
- client.write_points(df_stats, 'ms')
- client.write_points(self.get_daemon_stats(), 'ms')
- client.write_points(self.get_pg_summary(pools))
- self.set_health_checks(dict())
- except ConnectionError as e:
- self.log.exception("Failed to connect to Influx host %s:%d",
- self.config['hostname'], self.config['port'])
- self.set_health_checks({
- 'MGR_INFLUX_SEND_FAILED': {
- 'severity': 'warning',
- 'summary': 'Failed to send data to InfluxDB server at %s:%d'
- ' due to an connection error'
- % (self.config['hostname'], self.config['port']),
- 'detail': [str(e)]
- }
- })
- except InfluxDBClientError as e:
- if e.code == 404:
+ client = self.get_influx_client()
+ databases = client.get_list_database()
+ if {'name': self.config['database']} not in databases:
self.log.info("Database '%s' not found, trying to create "
- "(requires admin privs). You can also create "
+ "(requires admin privs). You can also create "
"manually and grant write privs to user "
"'%s'", self.config['database'],
- self.config['username'])
+ self.config['database'])
client.create_database(self.config['database'])
- client.create_retention_policy(name='8_weeks', duration='8w',
- replication='1', default=True,
+ client.create_retention_policy(name='8_weeks',
+ duration='8w',
+ replication='1',
+ default=True,
database=self.config['database'])
- else:
- self.set_health_checks({
- 'MGR_INFLUX_SEND_FAILED': {
- 'severity': 'warning',
- 'summary': 'Failed to send data to InfluxDB',
- 'detail': [str(e)]
- }
- })
- raise
+ client.close()
+
+ self.log.debug('Gathering statistics')
+ points = self.gather_statistics()
+ for chunk in self.chunk(points, self.config['batch_size']):
+ self.queue.put(chunk, block=False)
+
+ self.log.debug('Queue currently contains %d items',
+ self.queue.qsize())
+ except queue.Full:
+ self.health_checks.update({
+ 'MGR_INFLUX_QUEUE_FULL': {
+ 'severity': 'warning',
+ 'summary': 'Failed to chunk to InfluxDB Queue',
+ 'detail': ['Queue is full. InfluxDB might be slow with '
+ 'processing data']
+ }
+ })
+ self.log.error('Queue is full, failed to add chunk')
+ except (RequestException, InfluxDBClientError) as e:
+ self.health_checks.update({
+ 'MGR_INFLUX_DB_LIST_FAILED': {
+ 'severity': 'warning',
+ 'summary': 'Failed to list/create InfluxDB database',
+ 'detail': [str(e)]
+ }
+ })
+ self.log.exception('Failed to list/create InfluxDB database')
+ return False
+ finally:
+ self.set_health_checks(self.health_checks)
def shutdown(self):
self.log.info('Stopping influx module')
self.run = False
self.event.set()
+ self.log.debug('Shutting down queue workers')
+
+ for _ in self.workers:
+ self.queue.put(None)
+
+ self.queue.join()
+
+ for worker in self.workers:
+ worker.join()
def self_test(self):
- daemon_stats = self.get_daemon_stats()
+ now = self.get_timestamp()
+ daemon_stats = list(self.get_daemon_stats(now))
assert len(daemon_stats)
- df_stats, pools = self.get_df_stats()
+ df_stats, pools = self.get_df_stats(now)
result = {
'daemon_stats': daemon_stats,
self.init_module_config()
self.run = True
+ self.log.debug('Starting %d queue worker threads',
+ self.config['threads'])
+ for i in range(self.config['threads']):
+ worker = Thread(target=self.queue_worker, args=())
+ worker.setDaemon(True)
+ worker.start()
+ self.workers.append(worker)
+
while self.run:
start = time.time()
self.send_to_influx()
runtime = time.time() - start
- self.log.debug('Finished sending data in Influx in %.3f seconds',
+ self.log.debug('Finished sending data to Influx in %.3f seconds',
runtime)
self.log.debug("Sleeping for %d seconds", self.config['interval'])
self.event.wait(self.config['interval'])