import json
import errno
import time
-from typing import Tuple
+from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union
-from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option
+from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue
try:
from influxdb import InfluxDBClient
]
@property
- def config_keys(self):
+ def config_keys(self) -> Dict[str, OptionValue]:
return dict((o['name'], o.get('default', None))
for o in self.MODULE_OPTIONS)
}
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
self.run = True
- self.config = dict()
- self.workers = list()
- self.queue = queue.Queue(maxsize=100)
- self.health_checks = dict()
+ self.config: Dict[str, OptionValue] = dict()
+ self.workers: List[Thread] = list()
+ self.queue: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue.Queue(maxsize=100)
+ self.health_checks: Dict[str, Dict[str, Any]] = dict()
- def get_fsid(self):
+ def get_fsid(self) -> str:
return self.get('mon_map')['fsid']
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
if InfluxDBClient is not None:
return True, ""
else:
return False, "influxdb python module not found"
@staticmethod
- def get_timestamp():
+ def get_timestamp() -> str:
return datetime.utcnow().isoformat() + 'Z'
@staticmethod
- def chunk(l, n):
+ def chunk(l: Iterator[Dict[str, str]], n: int) -> Iterator[List[Dict[str, str]]]:
try:
while True:
xs = []
except StopIteration:
yield xs
- def queue_worker(self):
+ def queue_worker(self) -> None:
while True:
try:
points = self.queue.get()
finally:
self.queue.task_done()
- def get_latest(self, daemon_type, daemon_name, stat):
+ def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
data = self.get_counter(daemon_type, daemon_name, stat)[stat]
if data:
return data[-1][1]
return 0
- def get_df_stats(self, now):
+ def get_df_stats(self, now) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
df = self.get("df")
data = []
pool_info = {}
pool_info.update({str(pool['id']):pool['name']})
return data, pool_info
- def get_pg_summary_osd(self, pool_info, now):
+ def get_pg_summary_osd(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
pg_sum = self.get('pg_summary')
osd_sum = pg_sum['by_osd']
for osd_id, stats in osd_sum.items():
}
}
- def get_pg_summary_pool(self, pool_info, now):
+ def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
pool_sum = self.get('pg_summary')['by_pool']
for pool_id, stats in pool_sum.items():
for stat in stats:
}
}
- def get_daemon_stats(self, now):
+ def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]:
for daemon, counters in self.get_all_perf_counters().items():
svc_type, svc_id = daemon.split(".", 1)
metadata = self.get_metadata(svc_type, svc_id)
}
}
- def set_config_option(self, option, value):
+ def set_config_option(self, option: str, value: str) -> None:
if option not in self.config_keys.keys():
raise RuntimeError('{0} is a unknown configuration '
'option'.format(option))
self.config[option] = value
- def init_module_config(self):
+ def init_module_config(self) -> None:
self.config['hostname'] = \
self.get_module_option("hostname", default=self.config_keys['hostname'])
self.config['port'] = \
- int(self.get_module_option("port", default=self.config_keys['port']))
+ cast(int, self.get_module_option("port", default=self.config_keys['port']))
self.config['database'] = \
self.get_module_option("database", default=self.config_keys['database'])
self.config['username'] = \
self.config['password'] = \
self.get_module_option("password", default=self.config_keys['password'])
self.config['interval'] = \
- int(self.get_module_option("interval",
- default=self.config_keys['interval']))
+ cast(int, self.get_module_option("interval",
+ default=self.config_keys['interval']))
self.config['threads'] = \
- int(self.get_module_option("threads",
- default=self.config_keys['threads']))
+ cast(int, self.get_module_option("threads",
+ default=self.config_keys['threads']))
self.config['batch_size'] = \
- int(self.get_module_option("batch_size",
- default=self.config_keys['batch_size']))
- ssl = self.get_module_option("ssl", default=self.config_keys['ssl'])
+ cast(int, self.get_module_option("batch_size",
+ default=self.config_keys['batch_size']))
+ ssl = cast(str, self.get_module_option("ssl", default=self.config_keys['ssl']))
self.config['ssl'] = ssl.lower() == 'true'
verify_ssl = \
- self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl'])
+ cast(str, self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']))
self.config['verify_ssl'] = verify_ssl.lower() == 'true'
- def gather_statistics(self):
+ def gather_statistics(self) -> Iterator[Dict[str, str]]:
now = self.get_timestamp()
df_stats, pools = self.get_df_stats(now)
return chain(df_stats, self.get_daemon_stats(now),
self.get_pg_summary_pool(pools, now))
@contextmanager
- def get_influx_client(self):
+ def get_influx_client(self) -> Iterator['InfluxDBClient']:
client = InfluxDBClient(self.config['hostname'],
self.config['port'],
self.config['username'],
# influxdb older than v5.0.0
pass
- def send_to_influx(self):
+ def send_to_influx(self) -> bool:
if not self.config['hostname']:
self.log.error("No Influx server configured, please set one using: "
"ceph influx config-set hostname <hostname>")
self.log.debug('Gathering statistics')
points = self.gather_statistics()
- for chunk in self.chunk(points, self.config['batch_size']):
+ for chunk in self.chunk(points, cast(int, self.config['batch_size'])):
self.queue.put(chunk, block=False)
self.log.debug('Queue currently contains %d items',
self.queue.qsize())
+ return True
except queue.Full:
self.health_checks.update({
'MGR_INFLUX_QUEUE_FULL': {
}
})
self.log.error('Queue is full, failed to add chunk')
+ return False
except (RequestException, InfluxDBClientError) as e:
self.health_checks.update({
'MGR_INFLUX_DB_LIST_FAILED': {
finally:
self.set_health_checks(self.health_checks)
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping influx module')
self.run = False
self.event.set()
for worker in self.workers:
worker.join()
- def self_test(self):
+ def self_test(self) -> Optional[str]:
now = self.get_timestamp()
daemon_stats = list(self.get_daemon_stats(now))
assert len(daemon_stats)
self.send_to_influx()
return 0, 'Sending data to Influx', ''
- def serve(self):
+ def serve(self) -> None:
if InfluxDBClient is None:
self.log.error("Cannot transmit statistics: influxdb python "
"module not found. Did you install it?")
self.log.debug('Starting %d queue worker threads',
self.config['threads'])
- for i in range(self.config['threads']):
+ for i in range(cast(int, self.config['threads'])):
worker = Thread(target=self.queue_worker, args=())
worker.setDaemon(True)
worker.start()
self.log.debug('Finished sending data to Influx in %.3f seconds',
runtime)
self.log.debug("Sleeping for %d seconds", self.config['interval'])
- self.event.wait(self.config['interval'])
+ self.event.wait(cast(float, self.config['interval']))