]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/telegraf: add type annotations 40304/head
authorKefu Chai <kchai@redhat.com>
Mon, 22 Mar 2021 10:06:03 +0000 (18:06 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 23 Mar 2021 08:58:33 +0000 (16:58 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/mypy.ini
src/pybind/mgr/telegraf/basesocket.py
src/pybind/mgr/telegraf/module.py
src/pybind/mgr/telegraf/protocol.py
src/pybind/mgr/telegraf/utils.py
src/pybind/mgr/tox.ini

index b2882f861851bb9c3d55ac4c15f46be036e6ad6b..9413d529a6c36a044de136192097c8e3f5fc9785 100755 (executable)
@@ -71,6 +71,9 @@ disallow_untyped_defs = False
 [mypy-snap_schedule.*]
 disallow_untyped_defs = True
 
+[mypy-telegraf.*]
+disallow_untyped_defs = True
+
 [mypy-status.*]
 disallow_untyped_defs = True
 
index 435994661251c1c54a8985bcde1818bf39e5f617..5caea3be72596d8a920474ce7cbff9018cb60f27 100644 (file)
@@ -1,4 +1,6 @@
 import socket
+from urllib.parse import ParseResult
+from typing import Any, Dict, Optional, Tuple, Union
 
 
 class BaseSocket(object):
@@ -11,7 +13,7 @@ class BaseSocket(object):
         'udp6': (socket.AF_INET6, socket.SOCK_DGRAM),
     }
 
-    def __init__(self, url):
+    def __init__(self, url: ParseResult) -> None:
         self.url = url
 
         try:
@@ -21,25 +23,27 @@ class BaseSocket(object):
 
         self.sock = socket.socket(family=socket_family, type=socket_type)
         if self.sock.family == socket.AF_UNIX:
-            self.address = self.url.path
+            self.address: Union[str, Tuple[str, int]] = self.url.path
         else:
+            assert self.url.hostname
+            assert self.url.port
             self.address = (self.url.hostname, self.url.port)
 
-    def connect(self):
+    def connect(self) -> None:
         return self.sock.connect(self.address)
 
-    def close(self):
+    def close(self) -> None:
         self.sock.close()
 
-    def send(self, data, flags=0):
+    def send(self, data: str, flags: int = 0) -> int:
         return self.sock.send(data.encode('utf-8') + b'\n', flags)
 
-    def __del__(self):
+    def __del__(self) -> None:
         self.sock.close()
 
-    def __enter__(self):
+    def __enter__(self) -> 'BaseSocket':
         self.connect()
         return self
 
-    def __exit__(self, exc_type, exc_val, exc_tb):
+    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
         self.close()
index 1421fce02fc44b3592383333a629a22d3b21c696..f640f1d3a0fd82f5ec1768472bb36e0155bde7ce 100644 (file)
@@ -7,9 +7,9 @@ from threading import Event
 
 from telegraf.basesocket import BaseSocket
 from telegraf.protocol import Line
-from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, PG_STATES
+from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, PG_STATES
 
-from typing import Tuple
+from typing import cast, Any, Dict, Iterable, Optional, Tuple
 from urllib.parse import urlparse
 
 
@@ -24,23 +24,23 @@ class Module(MgrModule):
     ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
 
     @property
-    def config_keys(self):
+    def config_keys(self) -> Dict[str, OptionValue]:
         return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(Module, self).__init__(*args, **kwargs)
         self.event = Event()
         self.run = True
-        self.fsid = None
-        self.config = dict()
+        self.fsid: Optional[str] = None
+        self.config: Dict[str, OptionValue] = dict()
 
-    def get_fsid(self):
+    def get_fsid(self) -> str:
         if not self.fsid:
             self.fsid = self.get('mon_map')['fsid']
-
+        assert self.fsid is not None
         return self.fsid
 
-    def get_pool_stats(self):
+    def get_pool_stats(self) -> Iterable[Dict[str, Any]]:
         df = self.get('df')
 
         df_types = [
@@ -71,7 +71,7 @@ class Module(MgrModule):
                     'value': pool['stats'][df_type],
                 }
 
-    def get_daemon_stats(self):
+    def get_daemon_stats(self) -> Iterable[Dict[str, Any]]:
         for daemon, counters in self.get_all_perf_counters().items():
             svc_type, svc_id = daemon.split('.', 1)
             metadata = self.get_metadata(svc_type, svc_id)
@@ -93,7 +93,7 @@ class Module(MgrModule):
                     'value': counter_info['value']
                 }
 
-    def get_pg_stats(self):
+    def get_pg_stats(self) -> Dict[str, int]:
         stats = dict()
 
         pg_status = self.get('pg_status')
@@ -114,7 +114,7 @@ class Module(MgrModule):
 
         return stats
 
-    def get_cluster_stats(self):
+    def get_cluster_stats(self) -> Iterable[Dict[str, Any]]:
         stats = dict()
 
         health = json.loads(self.get('health')['json'])
@@ -159,11 +159,12 @@ class Module(MgrModule):
             num_mds_up += len(fs['mdsmap']['up'])
 
         stats['num_mds_up'] = num_mds_up
-        stats['num_mds'] = num_mds_up + stats['num_mds_standby']
+        stats['num_mds'] = num_mds_up + cast(int, stats['num_mds_standby'])
 
         stats.update(self.get_pg_stats())
 
         for key, value in stats.items():
+            assert value is not None
             yield {
                 'measurement': 'ceph_cluster_stats',
                 'tags': {
@@ -173,42 +174,43 @@ class Module(MgrModule):
                 'value': int(value)
             }
 
-    def set_config_option(self, option, value):
+    def set_config_option(self, option: str, value: str) -> None:
         if option not in self.config_keys.keys():
             raise RuntimeError('{0} is a unknown configuration '
                                'option'.format(option))
 
-        if option in ['interval']:
+        if option == 'interval':
             try:
-                value = int(value)
+                interval = int(value)
             except (ValueError, TypeError):
                 raise RuntimeError('invalid {0} configured. Please specify '
                                    'a valid integer'.format(option))
+            if interval < 5:
+                raise RuntimeError('interval should be set to at least 5 seconds')
+            self.config[option] = interval
+        else:
+            self.config[option] = value
 
-        if option == 'interval' and value < 5:
-            raise RuntimeError('interval should be set to at least 5 seconds')
-
-        self.config[option] = value
-
-    def init_module_config(self):
+    def init_module_config(self) -> None:
         self.config['address'] = \
             self.get_module_option("address", default=self.config_keys['address'])
-        self.config['interval'] = \
-            int(self.get_module_option("interval",
-                                default=self.config_keys['interval']))
+        interval = self.get_module_option("interval",
+                                          default=self.config_keys['interval'])
+        assert interval
+        self.config['interval'] = int(interval)
 
-    def now(self):
+    def now(self) -> int:
         return int(round(time.time() * 1000000000))
 
-    def gather_measurements(self):
+    def gather_measurements(self) -> Iterable[Dict[str, Any]]:
         return itertools.chain(
             self.get_pool_stats(),
             self.get_daemon_stats(),
             self.get_cluster_stats()
         )
 
-    def send_to_telegraf(self):
-        url = urlparse(self.config['address'])
+    def send_to_telegraf(self) -> None:
+        url = urlparse(cast(str, self.config['address']))
 
         sock = BaseSocket(url)
         self.log.debug('Sending data to Telegraf at %s', sock.address)
@@ -227,7 +229,7 @@ class Module(MgrModule):
         except FileNotFoundError:
             self.log.exception('Failed to open Telegraf at: %s', url.geturl())
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self.log.info('Stopping Telegraf module')
         self.run = False
         self.event.set()
@@ -259,12 +261,12 @@ class Module(MgrModule):
         self.send_to_telegraf()
         return 0, 'Sending data to Telegraf', ''
 
-    def self_test(self):
+    def self_test(self) -> None:
         measurements = list(self.gather_measurements())
         if len(measurements) == 0:
             raise RuntimeError('No measurements found')
 
-    def serve(self):
+    def serve(self) -> None:
         self.log.info('Starting Telegraf module')
         self.init_module_config()
         self.run = True
@@ -278,4 +280,4 @@ class Module(MgrModule):
             runtime = (self.now() - start) / 1000000
             self.log.debug('Sending data to Telegraf took %d ms', runtime)
             self.log.debug("Sleeping for %d seconds", self.config['interval'])
-            self.event.wait(self.config['interval'])
+            self.event.wait(cast(int, self.config['interval']))
index 80a25ff43ac9b3ae7dccccb193b7efa10a398107..7cf8bbe9ecd58216602e40951ea029db46ddd11c 100644 (file)
@@ -1,17 +1,23 @@
-from telegraf.utils import format_string, format_value
+from typing import Dict, Optional, Union
+
+from telegraf.utils import format_string, format_value, ValueType
 
 
 class Line(object):
-    def __init__(self, measurement, values, tags=None, timestamp=None):
+    def __init__(self,
+                 measurement: ValueType,
+                 values: Union[Dict[str, ValueType], ValueType],
+                 tags: Optional[Dict[str, str]] = None,
+                 timestamp: Optional[int] = None) -> None:
         self.measurement = measurement
         self.values = values
         self.tags = tags
         self.timestamp = timestamp
 
-    def get_output_measurement(self):
+    def get_output_measurement(self) -> str:
         return format_string(self.measurement)
 
-    def get_output_values(self):
+    def get_output_values(self) -> str:
         if not isinstance(self.values, dict):
             metric_values = {'value': self.values}
         else:
@@ -22,7 +28,7 @@ class Line(object):
 
         return ','.join('{0}={1}'.format(format_string(k), format_value(v)) for k, v in sorted_values)
 
-    def get_output_tags(self):
+    def get_output_tags(self) -> str:
         if not self.tags:
             self.tags = dict()
 
@@ -30,10 +36,10 @@ class Line(object):
 
         return ','.join('{0}={1}'.format(format_string(k), format_string(v)) for k, v in sorted_tags)
 
-    def get_output_timestamp(self):
+    def get_output_timestamp(self) -> str:
         return ' {0}'.format(self.timestamp) if self.timestamp else ''
 
-    def to_line_protocol(self):
+    def to_line_protocol(self) -> str:
         tags = self.get_output_tags()
 
         return '{0}{1} {2}{3}'.format(
index 4c7fd1cad2e927761f1412d514acfca4d328e5c8..783e9edc7b0072e4fa29da86621cee3de9d20ef3 100644 (file)
@@ -1,20 +1,26 @@
-def format_string(key):
+from typing import Union
+
+ValueType = Union[str, bool, int, float]
+
+
+def format_string(key: ValueType) -> str:
     if isinstance(key, str):
-        key = key.replace(',', r'\,')
-        key = key.replace(' ', r'\ ')
-        key = key.replace('=', r'\=')
-    return key
+        return key.replace(',', r'\,') \
+                  .replace(' ', r'\ ') \
+                  .replace('=', r'\=')
+    else:
+        return str(key)
 
 
-def format_value(value):
+def format_value(value: ValueType) -> str:
     if isinstance(value, str):
         value = value.replace('"', '\"')
-        value = u'"{0}"'.format(value)
+        return f'"{value}"'
     elif isinstance(value, bool):
-        value = str(value)
+        return str(value)
     elif isinstance(value, int):
-        value = "{0}i".format(value)
+        return f"{value}i"
     elif isinstance(value, float):
-        value = str(value)
-    return value
-
+        return str(value)
+    else:
+        raise ValueError()
index ec9bef219a9064c5fafb17cf970f3aaaa2cb1cec..f886739cc62869631bf333e4d69cd61d4be98122 100644 (file)
@@ -84,6 +84,7 @@ commands =
            -m snap_schedule \
            -m stats \
            -m status \
+           -m telegraf \
            -m telemetry \
            -m test_orchestrator \
            -m volumes \