-from collections import defaultdict
+
from datetime import datetime
-import json
-import sys
from threading import Event
-import time
from ConfigParser import SafeConfigParser
from influxdb import InfluxDBClient
from mgr_module import MgrModule
+from mgr_module import PERFCOUNTER_HISTOGRAM
class Module(MgrModule):
-
COMMANDS = [
{
"cmd": "influx self-test",
data.append(point)
return data
+ def get_daemon_stats(self):
+ data = []
- def get_default_stat(self):
- defaults= [
- "op_w",
- "op_in_bytes",
- "op_r",
- "op_out_bytes"
- ]
+ for daemon, counters in self.get_all_perf_counters().iteritems():
+ svc_type, svc_id = daemon.split(".")
+ metadata = self.get_metadata(svc_type, svc_id)
- osd_data = []
- cluster_data = []
- for default in defaults:
- osdmap = self.get("osd_map")['osds']
- value = 0
- for osd in osdmap:
- osd_id = osd['osd']
- metadata = self.get_metadata('osd', "%s" % osd_id)
- value += self.get_latest("osd", str(osd_id), "osd."+ str(default))
- point = {
+ for path, counter_info in counters.items():
+ if counter_info['type'] & PERFCOUNTER_HISTOGRAM:
+ continue
+
+ value = counter_info['value']
+
+ data.append({
"measurement": "ceph_osd_stats",
"tags": {
- "mgr_id": self.get_mgr_id(),
- "osd_id": osd_id,
- "type_instance": default,
+ "ceph_daemon": daemon,
+ "type_instance": path,
"host": metadata['hostname']
},
- "time" : datetime.utcnow().isoformat() + 'Z',
- "fields" : {
- "value": self.get_latest("osd", osd_id.__str__(), "osd."+ default.__str__())
- }
- }
- osd_data.append(point)
- point2 = {
- "measurement": "ceph_cluster_stats",
- "tags": {
- "mgr_id": self.get_mgr_id(),
- "type_instance": default,
- },
- "time" : datetime.utcnow().isoformat() + 'Z',
- "fields" : {
- "value": value
+ "time": datetime.utcnow().isoformat() + 'Z',
+ "fields": {
+ "value": value
}
- }
- cluster_data.append(point2)
- return osd_data, cluster_data
+ })
-
-
- def get_extended(self, counter_type, type_inst):
- path = "osd." + type_inst.__str__()
- osdmap = self.get("osd_map")
- data = []
- value = 0
- for osd in osdmap['osds']:
- osd_id = osd['osd']
- metadata = self.get_metadata('osd', "%s" % osd_id)
- value += self.get_latest("osd", osd_id.__str__(), path.__str__())
- point = {
- "measurement": "ceph_osd_stats",
- "tags": {
- "mgr_id": self.get_mgr_id(),
- "osd_id": osd_id,
- "type_instance": type_inst,
- "host": metadata['hostname']
- },
- "time" : datetime.utcnow().isoformat() + 'Z',
- "fields" : {
- "value": self.get_latest("osd", osd_id.__str__(), path.__str__())
- }
- }
- data.append(point)
- if counter_type == "cluster":
- point = [{
- "measurement": "ceph_cluster_stats",
- "tags": {
- "mgr_id": self.get_mgr_id(),
- "type_instance": type_inst,
- },
- "time" : datetime.utcnow().isoformat() + 'Z',
- "fields" : {
- "value": value
- }
- }]
- return point
- else:
- return data
+ return data
def send_to_influx(self):
config = SafeConfigParser()
stats = config.get('influx', 'stats').replace(' ', '').split(',')
client = InfluxDBClient(host, port, username, password, database)
databases_avail = client.get_list_database()
- default_stats = self.get_default_stat()
+ daemon_stats = self.get_daemon_stats()
for database_avail in databases_avail:
if database_avail == database:
break
else:
client.create_database(database)
-
-
for stat in stats:
if stat == "pool":
client.write_points(self.get_df_stats(), 'ms')
elif stat == "osd":
- client.write_points(default_stats[0], 'ms')
- if config.has_option('extended', 'osd'):
- osds = config.get('extended', 'osd').replace(' ', '').split(',')
- for osd in osds:
- client.write_points(self.get_extended("osd", osd), 'ms')
+ client.write_points(daemon_stats, 'ms')
self.log.debug("wrote osd stats")
elif stat == "cluster":
- client.write_points(default_stats[-1], 'ms')
- if config.has_option('extended', 'cluster'):
- clusters = config.get('extended', 'cluster').replace(' ', '').split(',')
- for cluster in clusters:
- client.write_points(self.get_extended("cluster", cluster), 'ms')
self.log.debug("wrote cluster stats")
else:
self.log.error("invalid stat")