]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr: disallow_untyped_defs=True for mgr_util
authorSebastian Wagner <sebastian.wagner@suse.com>
Tue, 5 Jan 2021 11:58:08 +0000 (12:58 +0100)
committerJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Tue, 26 Jan 2021 14:58:28 +0000 (15:58 +0100)
(increases test coverage a tiny bit)

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit e308bf8989556617985716aea0c3bfb5da95799f)

src/pybind/mgr/mgr_util.py
src/pybind/mgr/tox.ini
src/pybind/mgr/volumes/fs/volume.py

index 1ae6992dc5cc6ddb4b9978052d3496327b34b702..a9d3f76f236053e0eeeb655159d7c84904693c5a 100644 (file)
@@ -19,10 +19,13 @@ if sys.version_info >= (3, 3):
 else:
     from threading import _Timer as Timer
 
-try:
-    from typing import Tuple, Any, Callable, Optional, Dict
-except ImportError:
-    TYPE_CHECKING = False  # just for type checking
+from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic
+T = TypeVar('T')
+
+if TYPE_CHECKING:
+    from mgr_module import MgrModule
+
+Module_T = TypeVar('Module_T', bound="MgrModule")
 
 (
     BLACK,
@@ -45,28 +48,28 @@ logger = logging.getLogger(__name__)
 
 
 class CephfsConnectionException(Exception):
-    def __init__(self, error_code, error_message):
+    def __init__(self, error_code: int, error_message: str):
         self.errno = error_code
         self.error_str = error_message
 
-    def to_tuple(self):
+    def to_tuple(self) -> Tuple[int, str, str]:
         return self.errno, "", self.error_str
 
-    def __str__(self):
+    def __str__(self) -> str:
         return "{0} ({1})".format(self.errno, self.error_str)
 
 
 class CephfsConnectionPool(object):
     class Connection(object):
-        def __init__(self, mgr, fs_name):
-            self.fs = None
+        def __init__(self, mgr: Module_T, fs_name: str):
+            self.fs: Optional["cephfs.LibCephFS"] = None
             self.mgr = mgr
             self.fs_name = fs_name
             self.ops_in_progress = 0
             self.last_used = time.time()
             self.fs_id = self.get_fs_id()
 
-        def get_fs_id(self):
+        def get_fs_id(self) -> int:
             fs_map = self.mgr.get('fs_map')
             for fs in fs_map['filesystems']:
                 if fs['mdsmap']['fs_name'] == self.fs_name:
@@ -74,18 +77,18 @@ class CephfsConnectionPool(object):
             raise CephfsConnectionException(
                 -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
 
-        def get_fs_handle(self):
+        def get_fs_handle(self) -> "cephfs.LibCephFS":
             self.last_used = time.time()
             self.ops_in_progress += 1
             return self.fs
 
-        def put_fs_handle(self, notify):
+        def put_fs_handle(self, notify: Callable) -> None:
             assert self.ops_in_progress > 0
             self.ops_in_progress -= 1
             if self.ops_in_progress == 0:
                 notify()
 
-        def del_fs_handle(self, waiter):
+        def del_fs_handle(self, waiter: Optional[Callable]) -> None:
             if waiter:
                 while self.ops_in_progress != 0:
                     waiter()
@@ -94,7 +97,7 @@ class CephfsConnectionPool(object):
             else:
                 self.abort()
 
-        def is_connection_valid(self):
+        def is_connection_valid(self) -> bool:
             fs_id = None
             try:
                 fs_id = self.get_fs_id()
@@ -104,10 +107,10 @@ class CephfsConnectionPool(object):
             logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
             return self.fs_id == fs_id
 
-        def is_connection_idle(self, timeout):
+        def is_connection_idle(self, timeout: float) -> bool:
             return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
 
-        def connect(self):
+        def connect(self) -> None:
             assert self.ops_in_progress == 0
             logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
             self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
@@ -121,7 +124,7 @@ class CephfsConnectionPool(object):
             logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
             self.mgr._ceph_register_client(self.fs.get_addrs())
 
-        def disconnect(self):
+        def disconnect(self) -> None:
             try:
                 assert self.fs
                 assert self.ops_in_progress == 0
@@ -134,7 +137,7 @@ class CephfsConnectionPool(object):
                 logger.debug("disconnect: ({0})".format(e))
                 raise
 
-        def abort(self):
+        def abort(self) -> None:
             assert self.fs
             assert self.ops_in_progress == 0
             logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
@@ -161,9 +164,9 @@ class CephfsConnectionPool(object):
     TIMER_TASK_RUN_INTERVAL = 30.0   # seconds
     CONNECTION_IDLE_INTERVAL = 60.0  # seconds
 
-    def __init__(self, mgr):
+    def __init__(self, mgr: Module_T):
         self.mgr = mgr
-        self.connections = {}
+        self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
         self.lock = Lock()
         self.cond = Condition(self.lock)
         self.timer_task = CephfsConnectionPool.RTimer(
@@ -171,7 +174,7 @@ class CephfsConnectionPool(object):
             self.cleanup_connections)
         self.timer_task.start()
 
-    def cleanup_connections(self):
+    def cleanup_connections(self) -> None:
         with self.lock:
             logger.info("scanning for idle connections..")
             idle_fs = [fs_name for fs_name, conn in self.connections.items()
@@ -180,7 +183,7 @@ class CephfsConnectionPool(object):
                 logger.info("cleaning up connection for '{}'".format(fs_name))
                 self._del_fs_handle(fs_name)
 
-    def get_fs_handle(self, fs_name):
+    def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
         with self.lock:
             conn = None
             try:
@@ -205,22 +208,22 @@ class CephfsConnectionPool(object):
             self.connections[fs_name] = conn
             return conn.get_fs_handle()
 
-    def put_fs_handle(self, fs_name):
+    def put_fs_handle(self, fs_name: str) -> None:
         with self.lock:
             conn = self.connections.get(fs_name, None)
             if conn:
                 conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
 
-    def _del_fs_handle(self, fs_name, wait=False):
+    def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
         conn = self.connections.pop(fs_name, None)
         if conn:
             conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
 
-    def del_fs_handle(self, fs_name, wait=False):
+    def del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
         with self.lock:
             self._del_fs_handle(fs_name, wait)
 
-    def del_all_handles(self):
+    def del_all_handles(self) -> None:
         with self.lock:
             for fs_name in list(self.connections.keys()):
                 logger.info("waiting for pending ops for '{}'".format(fs_name))
@@ -231,36 +234,36 @@ class CephfsConnectionPool(object):
             assert len(self.connections) == 0
 
 
-class CephfsClient(object):
-    def __init__(self, mgr):
+class CephfsClient(Generic[Module_T]):
+    def __init__(self, mgr: Module_T):
         self.mgr = mgr
         self.stopping = Event()
         self.connection_pool = CephfsConnectionPool(self.mgr)
 
-    def is_stopping(self):
+    def is_stopping(self) -> bool:
         return self.stopping.is_set()
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         logger.info("shutting down")
         # first, note that we're shutting down
         self.stopping.set()
         # second, delete all libcephfs handles from connection pool
         self.connection_pool.del_all_handles()
 
-    def get_fs(self, fs_name):
+    def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
         fs_map = self.mgr.get('fs_map')
         for fs in fs_map['filesystems']:
             if fs['mdsmap']['fs_name'] == fs_name:
                 return fs
         return None
 
-    def get_mds_names(self, fs_name):
+    def get_mds_names(self, fs_name: str) -> List[str]:
         fs = self.get_fs(fs_name)
         if fs is None:
             return []
         return [mds['name'] for mds in fs['mdsmap']['info'].values()]
 
-    def get_metadata_pool(self, fs_name):
+    def get_metadata_pool(self, fs_name: str) -> Optional[str]:
         fs = self.get_fs(fs_name)
         if fs:
             return fs['mdsmap']['metadata_pool']
@@ -268,7 +271,7 @@ class CephfsClient(object):
 
 
 @contextlib.contextmanager
-def open_filesystem(fsc, fs_name):
+def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
     """
     Open a volume with shared access.
     This API is to be used as a context manager.
@@ -288,7 +291,7 @@ def open_filesystem(fsc, fs_name):
         fsc.connection_pool.put_fs_handle(fs_name)
 
 
-def colorize(msg, color, dark=False):
+def colorize(msg: str, color: int, dark: bool = False) -> str:
     """
     Decorate `msg` with escape sequences to give the requested color
     """
@@ -296,14 +299,14 @@ def colorize(msg, color, dark=False):
         + msg + RESET_SEQ
 
 
-def bold(msg):
+def bold(msg: str) -> str:
     """
     Decorate `msg` with escape sequences to make it appear bold
     """
     return BOLD_SEQ + msg + RESET_SEQ
 
 
-def format_units(n, width, colored, decimal):
+def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
     """
     Format a number without units, so as to fit into `width` characters, substituting
     an appropriate unit suffix.
@@ -336,16 +339,15 @@ def format_units(n, width, colored, decimal):
         return formatted
 
 
-def format_dimless(n, width, colored=False):
+def format_dimless(n: int, width: int, colored: bool = False) -> str:
     return format_units(n, width, colored, decimal=True)
 
 
-def format_bytes(n, width, colored=False):
+def format_bytes(n: int, width: int, colored: bool = False) -> str:
     return format_units(n, width, colored, decimal=False)
 
 
-def merge_dicts(*args):
-    # type: (dict) -> dict
+def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
     """
     >>> merge_dicts({1:2}, {3:4})
     {1: 2, 3: 4}
@@ -364,7 +366,7 @@ def merge_dicts(*args):
 
 def get_default_addr():
     # type: () -> str
-    def is_ipv6_enabled():
+    def is_ipv6_enabled() -> bool:
         try:
             sock = socket.socket(socket.AF_INET6)
             with contextlib.closing(sock):
@@ -547,7 +549,7 @@ def verify_tls_files(cert_fname, pkey_fname):
                 pkey_fname, cert_fname, str(e)))
 
 
-def get_most_recent_rate(rates):
+def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
     """ Get most recent rate from rates
 
     :param rates: The derivative between all time series data points [time in seconds, value]
@@ -569,7 +571,7 @@ def get_most_recent_rate(rates):
         return 0.0
     return rates[-1][1]
 
-def get_time_series_rates(data):
+def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
     """ Rates from time series data
 
     :param data: Time series data [time in seconds, value]
@@ -594,11 +596,11 @@ def get_time_series_rates(data):
     data = _filter_time_series(data)
     if not data:
         return []
-    return [(data2[0], _derivative(data1, data2)) for data1, data2 in
+    return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
             _pairwise(data)]
 
 
-def _filter_time_series(data):
+def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
     """ Filters time series data
 
     Filters out samples with the same timestamp in given time series data.
@@ -643,7 +645,7 @@ def _filter_time_series(data):
     return filtered
 
 
-def _derivative(p1, p2):
+def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
     """ Derivative between two time series data points
 
     :param p1: Time series data [time in seconds, value]
@@ -664,7 +666,7 @@ def _derivative(p1, p2):
     return (p2[1] - p1[1]) / float(p2[0] - p1[0])
 
 
-def _pairwise(iterable):
+def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
     it = iter(iterable)
     a = next(it, None)
 
@@ -673,7 +675,7 @@ def _pairwise(iterable):
         a = b
 
 
-def to_pretty_timedelta(n):
+def to_pretty_timedelta(n: datetime.timedelta) -> str:
     if n < datetime.timedelta(seconds=120):
         return str(n.seconds) + 's'
     if n < datetime.timedelta(minutes=120):
@@ -689,14 +691,14 @@ def to_pretty_timedelta(n):
     return str(n.days // 365) + 'y'
 
 
-def profile_method(skip_attribute=False):
+def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
     """
     Decorator for methods of the Module class. Logs the name of the given
     function f with the time it takes to execute it.
     """
-    def outer(f):
+    def outer(f: Callable[..., T]) -> Callable[..., T]:
         @wraps(f)
-        def wrapper(*args, **kwargs):
+        def wrapper(*args: Any, **kwargs: Any) -> T:
             self = args[0]
             t = time.time()
             self.log.debug('Starting method {}.'.format(f.__name__))
index 020930e27390cea19717d7421d0f09df326c682d..5ea77228c623fd3f10d35c2704af2ba7ece13078 100644 (file)
@@ -55,19 +55,19 @@ deps =
     mypy==0.790
 commands =
     mypy --config-file=../../mypy.ini \
-           cephadm/module.py \
-           mgr_module.py \
-           dashboard/module.py \
-           prometheus/module.py \
-           mgr_util.py \
-           orchestrator/__init__.py \
-           progress/module.py \
-           rook/module.py \
-           snap_schedule/module.py \
-           stats/module.py \
-           test_orchestrator/module.py \
-           mds_autoscaler/module.py \
-           volumes/__init__.py
+           -m cephadm \
+           -m dashboard \
+           -m mds_autoscaler \
+           -m mgr_module \
+           -m mgr_util \
+           -m orchestrator \
+           -m progress \
+           -m prometheus \
+           -m rook \
+           -m snap_schedule \
+           -m stats \
+           -m test_orchestrator \
+           -m volumes
 
 [testenv:test]
 setenv = {[testenv]setenv}
index 9f82fa550ddbc61175ee76eb45ea6e1b824fc303..af7ebaaa5130aa9a7b863c8bb48ae391521da3cd 100644 (file)
@@ -1,6 +1,7 @@
 import json
 import errno
 import logging
+from typing import TYPE_CHECKING
 
 import cephfs
 
@@ -20,6 +21,9 @@ from .async_cloner import Cloner
 from .purge_queue import ThreadPoolPurgeQueueMixin
 from .operations.template import SubvolumeOpType
 
+if TYPE_CHECKING:
+    from volumes import Module
+
 log = logging.getLogger(__name__)
 
 ALLOWED_ACCESS_LEVELS = ('r', 'rw')
@@ -42,7 +46,7 @@ def name_to_json(names):
     return json.dumps(namedict, indent=4, sort_keys=True)
 
 
-class VolumeClient(CephfsClient):
+class VolumeClient(CephfsClient["Module"]):
     def __init__(self, mgr):
         super().__init__(mgr)
         # volume specification