From aca45d7d08fd8c3f32849331eba4620e2726282a Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 22 Feb 2021 11:45:30 +0800 Subject: [PATCH] mgr/influx: add typing annotation Signed-off-by: Kefu Chai --- src/mypy.ini | 3 ++ src/pybind/mgr/cephadm/serve.py | 3 +- src/pybind/mgr/influx/module.py | 78 +++++++++++++++++---------------- src/pybind/mgr/tox.ini | 1 + 4 files changed, 46 insertions(+), 39 deletions(-) diff --git a/src/mypy.ini b/src/mypy.ini index d00cdecaed4..6bbefb44cca 100755 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -110,6 +110,9 @@ ignore_missing_imports = True [mypy-bcrypt] ignore_missing_imports = True +[mypy-influxdb.*] +ignore_missing_imports = True + [mypy-numpy.*] ignore_missing_imports = True diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index d253f5e67ea..2af5553f2d7 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -391,7 +391,8 @@ class CephadmServe: name = '%s.%s' % (s.get('type'), daemon_id) if s.get('type') == 'rbd-mirror': metadata = self.mgr.get_metadata( - "rbd-mirror", daemon_id) + "rbd-mirror", daemon_id, {}) + assert metadata is not None try: name = '%s.%s' % (s.get('type'), metadata['id']) except (KeyError, TypeError): diff --git a/src/pybind/mgr/influx/module.py b/src/pybind/mgr/influx/module.py index f0526d85fbc..5d0682a8b88 100644 --- a/src/pybind/mgr/influx/module.py +++ b/src/pybind/mgr/influx/module.py @@ -6,9 +6,9 @@ import queue import json import errno import time -from typing import Tuple +from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union -from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option +from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue try: from influxdb import InfluxDBClient @@ -50,7 +50,7 @@ class Module(MgrModule): ] @property - def config_keys(self): + def config_keys(self) -> Dict[str, OptionValue]: return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) @@ -73,31 +73,31 @@ class Module(MgrModule): } ] - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super(Module, self).__init__(*args, **kwargs) self.event = Event() self.run = True - self.config = dict() - self.workers = list() - self.queue = queue.Queue(maxsize=100) - self.health_checks = dict() + self.config: Dict[str, OptionValue] = dict() + self.workers: List[Thread] = list() + self.queue: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue.Queue(maxsize=100) + self.health_checks: Dict[str, Dict[str, Any]] = dict() - def get_fsid(self): + def get_fsid(self) -> str: return self.get('mon_map')['fsid'] @staticmethod - def can_run(): + def can_run() -> Tuple[bool, str]: if InfluxDBClient is not None: return True, "" else: return False, "influxdb python module not found" @staticmethod - def get_timestamp(): + def get_timestamp() -> str: return datetime.utcnow().isoformat() + 'Z' @staticmethod - def chunk(l, n): + def chunk(l: Iterator[Dict[str, str]], n: int) -> Iterator[List[Dict[str, str]]]: try: while True: xs = [] @@ -107,7 +107,7 @@ class Module(MgrModule): except StopIteration: yield xs - def queue_worker(self): + def queue_worker(self) -> None: while True: try: points = self.queue.get() @@ -150,14 +150,14 @@ class Module(MgrModule): finally: self.queue.task_done() - def get_latest(self, daemon_type, daemon_name, stat): + def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int: data = self.get_counter(daemon_type, daemon_name, stat)[stat] if data: return data[-1][1] return 0 - def get_df_stats(self, now): + def get_df_stats(self, now) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: df = self.get("df") data = [] pool_info = {} @@ -196,7 +196,7 @@ class Module(MgrModule): pool_info.update({str(pool['id']):pool['name']}) return data, pool_info - def get_pg_summary_osd(self, pool_info, now): + def get_pg_summary_osd(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]: pg_sum = self.get('pg_summary') osd_sum = pg_sum['by_osd'] for osd_id, stats in osd_sum.items(): @@ -218,7 +218,7 @@ class Module(MgrModule): } } - def get_pg_summary_pool(self, pool_info, now): + def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]: pool_sum = self.get('pg_summary')['by_pool'] for pool_id, stats in pool_sum.items(): for stat in stats: @@ -235,7 +235,7 @@ class Module(MgrModule): } } - def get_daemon_stats(self, now): + def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]: for daemon, counters in self.get_all_perf_counters().items(): svc_type, svc_id = daemon.split(".", 1) metadata = self.get_metadata(svc_type, svc_id) @@ -260,7 +260,7 @@ class Module(MgrModule): } } - def set_config_option(self, option, value): + def set_config_option(self, option: str, value: str) -> None: if option not in self.config_keys.keys(): raise RuntimeError('{0} is a unknown configuration ' 'option'.format(option)) @@ -284,11 +284,11 @@ class Module(MgrModule): self.config[option] = value - def init_module_config(self): + def init_module_config(self) -> None: self.config['hostname'] = \ self.get_module_option("hostname", default=self.config_keys['hostname']) self.config['port'] = \ - int(self.get_module_option("port", default=self.config_keys['port'])) + cast(int, self.get_module_option("port", default=self.config_keys['port'])) self.config['database'] = \ self.get_module_option("database", default=self.config_keys['database']) self.config['username'] = \ @@ -296,21 +296,21 @@ class Module(MgrModule): self.config['password'] = \ self.get_module_option("password", default=self.config_keys['password']) self.config['interval'] = \ - int(self.get_module_option("interval", - default=self.config_keys['interval'])) + cast(int, self.get_module_option("interval", + default=self.config_keys['interval'])) self.config['threads'] = \ - int(self.get_module_option("threads", - default=self.config_keys['threads'])) + cast(int, self.get_module_option("threads", + default=self.config_keys['threads'])) self.config['batch_size'] = \ - int(self.get_module_option("batch_size", - default=self.config_keys['batch_size'])) - ssl = self.get_module_option("ssl", default=self.config_keys['ssl']) + cast(int, self.get_module_option("batch_size", + default=self.config_keys['batch_size'])) + ssl = cast(str, self.get_module_option("ssl", default=self.config_keys['ssl'])) self.config['ssl'] = ssl.lower() == 'true' verify_ssl = \ - self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']) + cast(str, self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl'])) self.config['verify_ssl'] = verify_ssl.lower() == 'true' - def gather_statistics(self): + def gather_statistics(self) -> Iterator[Dict[str, str]]: now = self.get_timestamp() df_stats, pools = self.get_df_stats(now) return chain(df_stats, self.get_daemon_stats(now), @@ -318,7 +318,7 @@ class Module(MgrModule): self.get_pg_summary_pool(pools, now)) @contextmanager - def get_influx_client(self): + def get_influx_client(self) -> Iterator['InfluxDBClient']: client = InfluxDBClient(self.config['hostname'], self.config['port'], self.config['username'], @@ -335,7 +335,7 @@ class Module(MgrModule): # influxdb older than v5.0.0 pass - def send_to_influx(self): + def send_to_influx(self) -> bool: if not self.config['hostname']: self.log.error("No Influx server configured, please set one using: " "ceph influx config-set hostname ") @@ -371,11 +371,12 @@ class Module(MgrModule): self.log.debug('Gathering statistics') points = self.gather_statistics() - for chunk in self.chunk(points, self.config['batch_size']): + for chunk in self.chunk(points, cast(int, self.config['batch_size'])): self.queue.put(chunk, block=False) self.log.debug('Queue currently contains %d items', self.queue.qsize()) + return True except queue.Full: self.health_checks.update({ 'MGR_INFLUX_QUEUE_FULL': { @@ -386,6 +387,7 @@ class Module(MgrModule): } }) self.log.error('Queue is full, failed to add chunk') + return False except (RequestException, InfluxDBClientError) as e: self.health_checks.update({ 'MGR_INFLUX_DB_LIST_FAILED': { @@ -399,7 +401,7 @@ class Module(MgrModule): finally: self.set_health_checks(self.health_checks) - def shutdown(self): + def shutdown(self) -> None: self.log.info('Stopping influx module') self.run = False self.event.set() @@ -413,7 +415,7 @@ class Module(MgrModule): for worker in self.workers: worker.join() - def self_test(self): + def self_test(self) -> Optional[str]: now = self.get_timestamp() daemon_stats = list(self.get_daemon_stats(now)) assert len(daemon_stats) @@ -454,7 +456,7 @@ class Module(MgrModule): self.send_to_influx() return 0, 'Sending data to Influx', '' - def serve(self): + def serve(self) -> None: if InfluxDBClient is None: self.log.error("Cannot transmit statistics: influxdb python " "module not found. Did you install it?") @@ -466,7 +468,7 @@ class Module(MgrModule): self.log.debug('Starting %d queue worker threads', self.config['threads']) - for i in range(self.config['threads']): + for i in range(cast(int, self.config['threads'])): worker = Thread(target=self.queue_worker, args=()) worker.setDaemon(True) worker.start() @@ -479,4 +481,4 @@ class Module(MgrModule): self.log.debug('Finished sending data to Influx in %.3f seconds', runtime) self.log.debug("Sleeping for %d seconds", self.config['interval']) - self.event.wait(self.config['interval']) + self.event.wait(cast(float, self.config['interval'])) diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 81ada366aea..f4ab7d01d29 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -67,6 +67,7 @@ commands = -m dashboard \ -m devicehealth \ -m hello \ + -m influx \ -m iostat \ -m mds_autoscaler \ -m mgr_module \ -- 2.39.5