]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/influx: add typing annotation
authorKefu Chai <kchai@redhat.com>
Mon, 22 Feb 2021 03:45:30 +0000 (11:45 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 25 Feb 2021 07:54:12 +0000 (15:54 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/mypy.ini
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/influx/module.py
src/pybind/mgr/tox.ini

index d00cdecaed47e6dcd3d2e82852c0e22b0308ae43..6bbefb44ccad82a24024ef7f1c3745690723339e 100755 (executable)
@@ -110,6 +110,9 @@ ignore_missing_imports = True
 [mypy-bcrypt]
 ignore_missing_imports = True
 
+[mypy-influxdb.*]
+ignore_missing_imports = True
+
 [mypy-numpy.*]
 ignore_missing_imports = True
 
index d253f5e67eabbd8ed1769b0d5ae00df3e0a27b1a..2af5553f2d7cd59be608b5968ee4556a82d28130 100644 (file)
@@ -391,7 +391,8 @@ class CephadmServe:
                     name = '%s.%s' % (s.get('type'), daemon_id)
                     if s.get('type') == 'rbd-mirror':
                         metadata = self.mgr.get_metadata(
-                            "rbd-mirror", daemon_id)
+                            "rbd-mirror", daemon_id, {})
+                        assert metadata is not None
                         try:
                             name = '%s.%s' % (s.get('type'), metadata['id'])
                         except (KeyError, TypeError):
index f0526d85fbc4975d359a9f275932c2f7026dc13d..5d0682a8b88acbc123a52dbd0e84f56313b3530c 100644 (file)
@@ -6,9 +6,9 @@ import queue
 import json
 import errno
 import time
-from typing import Tuple
+from typing import cast, Any, Dict, Iterator, List, Optional, Tuple, Union
 
-from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option
+from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option, OptionValue
 
 try:
     from influxdb import InfluxDBClient
@@ -50,7 +50,7 @@ class Module(MgrModule):
     ]
 
     @property
-    def config_keys(self):
+    def config_keys(self) -> Dict[str, OptionValue]:
         return dict((o['name'], o.get('default', None))
                 for o in self.MODULE_OPTIONS)
 
@@ -73,31 +73,31 @@ class Module(MgrModule):
         }
     ]
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(Module, self).__init__(*args, **kwargs)
         self.event = Event()
         self.run = True
-        self.config = dict()
-        self.workers = list()
-        self.queue = queue.Queue(maxsize=100)
-        self.health_checks = dict()
+        self.config: Dict[str, OptionValue] = dict()
+        self.workers: List[Thread] = list()
+        self.queue: 'queue.Queue[Optional[List[Dict[str, str]]]]' = queue.Queue(maxsize=100)
+        self.health_checks: Dict[str, Dict[str, Any]] = dict()
 
-    def get_fsid(self):
+    def get_fsid(self) -> str:
         return self.get('mon_map')['fsid']
 
     @staticmethod
-    def can_run():
+    def can_run() -> Tuple[bool, str]:
         if InfluxDBClient is not None:
             return True, ""
         else:
             return False, "influxdb python module not found"
 
     @staticmethod
-    def get_timestamp():
+    def get_timestamp() -> str:
         return datetime.utcnow().isoformat() + 'Z'
 
     @staticmethod
-    def chunk(l, n):
+    def chunk(l: Iterator[Dict[str, str]], n: int) -> Iterator[List[Dict[str, str]]]:
         try:
             while True:
                 xs = []
@@ -107,7 +107,7 @@ class Module(MgrModule):
         except StopIteration:
             yield xs
 
-    def queue_worker(self):
+    def queue_worker(self) -> None:
         while True:
             try:
                 points = self.queue.get()
@@ -150,14 +150,14 @@ class Module(MgrModule):
             finally:
                 self.queue.task_done()
 
-    def get_latest(self, daemon_type, daemon_name, stat):
+    def get_latest(self, daemon_type: str, daemon_name: str, stat: str) -> int:
         data = self.get_counter(daemon_type, daemon_name, stat)[stat]
         if data:
             return data[-1][1]
 
         return 0
 
-    def get_df_stats(self, now):
+    def get_df_stats(self, now) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
         df = self.get("df")
         data = []
         pool_info = {}
@@ -196,7 +196,7 @@ class Module(MgrModule):
                 pool_info.update({str(pool['id']):pool['name']})
         return data, pool_info
 
-    def get_pg_summary_osd(self, pool_info, now):
+    def get_pg_summary_osd(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
         pg_sum = self.get('pg_summary')
         osd_sum = pg_sum['by_osd']
         for osd_id, stats in osd_sum.items():
@@ -218,7 +218,7 @@ class Module(MgrModule):
                     }
                 }
 
-    def get_pg_summary_pool(self, pool_info, now):
+    def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[Dict[str, Any]]:
         pool_sum = self.get('pg_summary')['by_pool']
         for pool_id, stats in pool_sum.items():
             for stat in stats:
@@ -235,7 +235,7 @@ class Module(MgrModule):
                     }
                 }
 
-    def get_daemon_stats(self, now):
+    def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]:
         for daemon, counters in self.get_all_perf_counters().items():
             svc_type, svc_id = daemon.split(".", 1)
             metadata = self.get_metadata(svc_type, svc_id)
@@ -260,7 +260,7 @@ class Module(MgrModule):
                     }
                 }
 
-    def set_config_option(self, option, value):
+    def set_config_option(self, option: str, value: str) -> None:
         if option not in self.config_keys.keys():
             raise RuntimeError('{0} is a unknown configuration '
                                'option'.format(option))
@@ -284,11 +284,11 @@ class Module(MgrModule):
 
         self.config[option] = value
 
-    def init_module_config(self):
+    def init_module_config(self) -> None:
         self.config['hostname'] = \
             self.get_module_option("hostname", default=self.config_keys['hostname'])
         self.config['port'] = \
-            int(self.get_module_option("port", default=self.config_keys['port']))
+            cast(int, self.get_module_option("port", default=self.config_keys['port']))
         self.config['database'] = \
             self.get_module_option("database", default=self.config_keys['database'])
         self.config['username'] = \
@@ -296,21 +296,21 @@ class Module(MgrModule):
         self.config['password'] = \
             self.get_module_option("password", default=self.config_keys['password'])
         self.config['interval'] = \
-            int(self.get_module_option("interval",
-                                default=self.config_keys['interval']))
+            cast(int, self.get_module_option("interval",
+                                             default=self.config_keys['interval']))
         self.config['threads'] = \
-            int(self.get_module_option("threads",
-                                default=self.config_keys['threads']))
+            cast(int, self.get_module_option("threads",
+                                             default=self.config_keys['threads']))
         self.config['batch_size'] = \
-            int(self.get_module_option("batch_size",
-                                default=self.config_keys['batch_size']))
-        ssl = self.get_module_option("ssl", default=self.config_keys['ssl'])
+            cast(int, self.get_module_option("batch_size",
+                                             default=self.config_keys['batch_size']))
+        ssl = cast(str, self.get_module_option("ssl", default=self.config_keys['ssl']))
         self.config['ssl'] = ssl.lower() == 'true'
         verify_ssl = \
-            self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl'])
+            cast(str, self.get_module_option("verify_ssl", default=self.config_keys['verify_ssl']))
         self.config['verify_ssl'] = verify_ssl.lower() == 'true'
 
-    def gather_statistics(self):
+    def gather_statistics(self) -> Iterator[Dict[str, str]]:
         now = self.get_timestamp()
         df_stats, pools = self.get_df_stats(now)
         return chain(df_stats, self.get_daemon_stats(now),
@@ -318,7 +318,7 @@ class Module(MgrModule):
                      self.get_pg_summary_pool(pools, now))
 
     @contextmanager
-    def get_influx_client(self):
+    def get_influx_client(self) -> Iterator['InfluxDBClient']:
         client = InfluxDBClient(self.config['hostname'],
                                 self.config['port'],
                                 self.config['username'],
@@ -335,7 +335,7 @@ class Module(MgrModule):
                 # influxdb older than v5.0.0
                 pass
 
-    def send_to_influx(self):
+    def send_to_influx(self) -> bool:
         if not self.config['hostname']:
             self.log.error("No Influx server configured, please set one using: "
                            "ceph influx config-set hostname <hostname>")
@@ -371,11 +371,12 @@ class Module(MgrModule):
 
             self.log.debug('Gathering statistics')
             points = self.gather_statistics()
-            for chunk in self.chunk(points, self.config['batch_size']):
+            for chunk in self.chunk(points, cast(int, self.config['batch_size'])):
                 self.queue.put(chunk, block=False)
 
             self.log.debug('Queue currently contains %d items',
                            self.queue.qsize())
+            return True
         except queue.Full:
             self.health_checks.update({
                 'MGR_INFLUX_QUEUE_FULL': {
@@ -386,6 +387,7 @@ class Module(MgrModule):
                 }
             })
             self.log.error('Queue is full, failed to add chunk')
+            return False
         except (RequestException, InfluxDBClientError) as e:
             self.health_checks.update({
                 'MGR_INFLUX_DB_LIST_FAILED': {
@@ -399,7 +401,7 @@ class Module(MgrModule):
         finally:
             self.set_health_checks(self.health_checks)
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self.log.info('Stopping influx module')
         self.run = False
         self.event.set()
@@ -413,7 +415,7 @@ class Module(MgrModule):
         for worker in self.workers:
             worker.join()
 
-    def self_test(self):
+    def self_test(self) -> Optional[str]:
         now = self.get_timestamp()
         daemon_stats = list(self.get_daemon_stats(now))
         assert len(daemon_stats)
@@ -454,7 +456,7 @@ class Module(MgrModule):
         self.send_to_influx()
         return 0, 'Sending data to Influx', ''
 
-    def serve(self):
+    def serve(self) -> None:
         if InfluxDBClient is None:
             self.log.error("Cannot transmit statistics: influxdb python "
                            "module not found.  Did you install it?")
@@ -466,7 +468,7 @@ class Module(MgrModule):
 
         self.log.debug('Starting %d queue worker threads',
                        self.config['threads'])
-        for i in range(self.config['threads']):
+        for i in range(cast(int, self.config['threads'])):
             worker = Thread(target=self.queue_worker, args=())
             worker.setDaemon(True)
             worker.start()
@@ -479,4 +481,4 @@ class Module(MgrModule):
             self.log.debug('Finished sending data to Influx in %.3f seconds',
                            runtime)
             self.log.debug("Sleeping for %d seconds", self.config['interval'])
-            self.event.wait(self.config['interval'])
+            self.event.wait(cast(float, self.config['interval']))
index 81ada366aeaa992358cf0f6485ac76adeaeef9f8..f4ab7d01d293ba84fe784a54c684837429519368 100644 (file)
@@ -67,6 +67,7 @@ commands =
            -m dashboard \
            -m devicehealth \
            -m hello \
+           -m influx \
            -m iostat \
            -m mds_autoscaler \
            -m mgr_module \