From c4046a1bf012b247ac781f028350a9283ede8647 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Tue, 5 Jan 2021 12:58:08 +0100 Subject: [PATCH] pybind/mgr: disallow_untyped_defs=True for mgr_util (increases test coverage a tiny bit) Signed-off-by: Sebastian Wagner (cherry picked from commit e308bf8989556617985716aea0c3bfb5da95799f) --- src/pybind/mgr/mgr_util.py | 106 ++++++++++++++-------------- src/pybind/mgr/tox.ini | 26 +++---- src/pybind/mgr/volumes/fs/volume.py | 6 +- 3 files changed, 72 insertions(+), 66 deletions(-) diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index 1ae6992dc5cc..a9d3f76f2360 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -19,10 +19,13 @@ if sys.version_info >= (3, 3): else: from threading import _Timer as Timer -try: - from typing import Tuple, Any, Callable, Optional, Dict -except ImportError: - TYPE_CHECKING = False # just for type checking +from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic +T = TypeVar('T') + +if TYPE_CHECKING: + from mgr_module import MgrModule + +Module_T = TypeVar('Module_T', bound="MgrModule") ( BLACK, @@ -45,28 +48,28 @@ logger = logging.getLogger(__name__) class CephfsConnectionException(Exception): - def __init__(self, error_code, error_message): + def __init__(self, error_code: int, error_message: str): self.errno = error_code self.error_str = error_message - def to_tuple(self): + def to_tuple(self) -> Tuple[int, str, str]: return self.errno, "", self.error_str - def __str__(self): + def __str__(self) -> str: return "{0} ({1})".format(self.errno, self.error_str) class CephfsConnectionPool(object): class Connection(object): - def __init__(self, mgr, fs_name): - self.fs = None + def __init__(self, mgr: Module_T, fs_name: str): + self.fs: Optional["cephfs.LibCephFS"] = None self.mgr = mgr self.fs_name = fs_name self.ops_in_progress = 0 self.last_used = time.time() self.fs_id = self.get_fs_id() - def get_fs_id(self): + def get_fs_id(self) -> int: fs_map = self.mgr.get('fs_map') for fs in fs_map['filesystems']: if fs['mdsmap']['fs_name'] == self.fs_name: @@ -74,18 +77,18 @@ class CephfsConnectionPool(object): raise CephfsConnectionException( -errno.ENOENT, "FS '{0}' not found".format(self.fs_name)) - def get_fs_handle(self): + def get_fs_handle(self) -> "cephfs.LibCephFS": self.last_used = time.time() self.ops_in_progress += 1 return self.fs - def put_fs_handle(self, notify): + def put_fs_handle(self, notify: Callable) -> None: assert self.ops_in_progress > 0 self.ops_in_progress -= 1 if self.ops_in_progress == 0: notify() - def del_fs_handle(self, waiter): + def del_fs_handle(self, waiter: Optional[Callable]) -> None: if waiter: while self.ops_in_progress != 0: waiter() @@ -94,7 +97,7 @@ class CephfsConnectionPool(object): else: self.abort() - def is_connection_valid(self): + def is_connection_valid(self) -> bool: fs_id = None try: fs_id = self.get_fs_id() @@ -104,10 +107,10 @@ class CephfsConnectionPool(object): logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id)) return self.fs_id == fs_id - def is_connection_idle(self, timeout): + def is_connection_idle(self, timeout: float) -> bool: return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout)) - def connect(self): + def connect(self) -> None: assert self.ops_in_progress == 0 logger.debug("Connecting to cephfs '{0}'".format(self.fs_name)) self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados) @@ -121,7 +124,7 @@ class CephfsConnectionPool(object): logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name)) self.mgr._ceph_register_client(self.fs.get_addrs()) - def disconnect(self): + def disconnect(self) -> None: try: assert self.fs assert self.ops_in_progress == 0 @@ -134,7 +137,7 @@ class CephfsConnectionPool(object): logger.debug("disconnect: ({0})".format(e)) raise - def abort(self): + def abort(self) -> None: assert self.fs assert self.ops_in_progress == 0 logger.info("aborting connection from cephfs '{0}'".format(self.fs_name)) @@ -161,9 +164,9 @@ class CephfsConnectionPool(object): TIMER_TASK_RUN_INTERVAL = 30.0 # seconds CONNECTION_IDLE_INTERVAL = 60.0 # seconds - def __init__(self, mgr): + def __init__(self, mgr: Module_T): self.mgr = mgr - self.connections = {} + self.connections: Dict[str, CephfsConnectionPool.Connection] = {} self.lock = Lock() self.cond = Condition(self.lock) self.timer_task = CephfsConnectionPool.RTimer( @@ -171,7 +174,7 @@ class CephfsConnectionPool(object): self.cleanup_connections) self.timer_task.start() - def cleanup_connections(self): + def cleanup_connections(self) -> None: with self.lock: logger.info("scanning for idle connections..") idle_fs = [fs_name for fs_name, conn in self.connections.items() @@ -180,7 +183,7 @@ class CephfsConnectionPool(object): logger.info("cleaning up connection for '{}'".format(fs_name)) self._del_fs_handle(fs_name) - def get_fs_handle(self, fs_name): + def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS": with self.lock: conn = None try: @@ -205,22 +208,22 @@ class CephfsConnectionPool(object): self.connections[fs_name] = conn return conn.get_fs_handle() - def put_fs_handle(self, fs_name): + def put_fs_handle(self, fs_name: str) -> None: with self.lock: conn = self.connections.get(fs_name, None) if conn: conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) - def _del_fs_handle(self, fs_name, wait=False): + def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None: conn = self.connections.pop(fs_name, None) if conn: conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) - def del_fs_handle(self, fs_name, wait=False): + def del_fs_handle(self, fs_name: str, wait: bool = False) -> None: with self.lock: self._del_fs_handle(fs_name, wait) - def del_all_handles(self): + def del_all_handles(self) -> None: with self.lock: for fs_name in list(self.connections.keys()): logger.info("waiting for pending ops for '{}'".format(fs_name)) @@ -231,36 +234,36 @@ class CephfsConnectionPool(object): assert len(self.connections) == 0 -class CephfsClient(object): - def __init__(self, mgr): +class CephfsClient(Generic[Module_T]): + def __init__(self, mgr: Module_T): self.mgr = mgr self.stopping = Event() self.connection_pool = CephfsConnectionPool(self.mgr) - def is_stopping(self): + def is_stopping(self) -> bool: return self.stopping.is_set() - def shutdown(self): + def shutdown(self) -> None: logger.info("shutting down") # first, note that we're shutting down self.stopping.set() # second, delete all libcephfs handles from connection pool self.connection_pool.del_all_handles() - def get_fs(self, fs_name): + def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]: fs_map = self.mgr.get('fs_map') for fs in fs_map['filesystems']: if fs['mdsmap']['fs_name'] == fs_name: return fs return None - def get_mds_names(self, fs_name): + def get_mds_names(self, fs_name: str) -> List[str]: fs = self.get_fs(fs_name) if fs is None: return [] return [mds['name'] for mds in fs['mdsmap']['info'].values()] - def get_metadata_pool(self, fs_name): + def get_metadata_pool(self, fs_name: str) -> Optional[str]: fs = self.get_fs(fs_name) if fs: return fs['mdsmap']['metadata_pool'] @@ -268,7 +271,7 @@ class CephfsClient(object): @contextlib.contextmanager -def open_filesystem(fsc, fs_name): +def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]: """ Open a volume with shared access. This API is to be used as a context manager. @@ -288,7 +291,7 @@ def open_filesystem(fsc, fs_name): fsc.connection_pool.put_fs_handle(fs_name) -def colorize(msg, color, dark=False): +def colorize(msg: str, color: int, dark: bool = False) -> str: """ Decorate `msg` with escape sequences to give the requested color """ @@ -296,14 +299,14 @@ def colorize(msg, color, dark=False): + msg + RESET_SEQ -def bold(msg): +def bold(msg: str) -> str: """ Decorate `msg` with escape sequences to make it appear bold """ return BOLD_SEQ + msg + RESET_SEQ -def format_units(n, width, colored, decimal): +def format_units(n: int, width: int, colored: bool, decimal: bool) -> str: """ Format a number without units, so as to fit into `width` characters, substituting an appropriate unit suffix. @@ -336,16 +339,15 @@ def format_units(n, width, colored, decimal): return formatted -def format_dimless(n, width, colored=False): +def format_dimless(n: int, width: int, colored: bool = False) -> str: return format_units(n, width, colored, decimal=True) -def format_bytes(n, width, colored=False): +def format_bytes(n: int, width: int, colored: bool = False) -> str: return format_units(n, width, colored, decimal=False) -def merge_dicts(*args): - # type: (dict) -> dict +def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]: """ >>> merge_dicts({1:2}, {3:4}) {1: 2, 3: 4} @@ -364,7 +366,7 @@ def merge_dicts(*args): def get_default_addr(): # type: () -> str - def is_ipv6_enabled(): + def is_ipv6_enabled() -> bool: try: sock = socket.socket(socket.AF_INET6) with contextlib.closing(sock): @@ -547,7 +549,7 @@ def verify_tls_files(cert_fname, pkey_fname): pkey_fname, cert_fname, str(e))) -def get_most_recent_rate(rates): +def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float: """ Get most recent rate from rates :param rates: The derivative between all time series data points [time in seconds, value] @@ -569,7 +571,7 @@ def get_most_recent_rate(rates): return 0.0 return rates[-1][1] -def get_time_series_rates(data): +def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]: """ Rates from time series data :param data: Time series data [time in seconds, value] @@ -594,11 +596,11 @@ def get_time_series_rates(data): data = _filter_time_series(data) if not data: return [] - return [(data2[0], _derivative(data1, data2)) for data1, data2 in + return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in _pairwise(data)] -def _filter_time_series(data): +def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]: """ Filters time series data Filters out samples with the same timestamp in given time series data. @@ -643,7 +645,7 @@ def _filter_time_series(data): return filtered -def _derivative(p1, p2): +def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float: """ Derivative between two time series data points :param p1: Time series data [time in seconds, value] @@ -664,7 +666,7 @@ def _derivative(p1, p2): return (p2[1] - p1[1]) / float(p2[0] - p1[0]) -def _pairwise(iterable): +def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]: it = iter(iterable) a = next(it, None) @@ -673,7 +675,7 @@ def _pairwise(iterable): a = b -def to_pretty_timedelta(n): +def to_pretty_timedelta(n: datetime.timedelta) -> str: if n < datetime.timedelta(seconds=120): return str(n.seconds) + 's' if n < datetime.timedelta(minutes=120): @@ -689,14 +691,14 @@ def to_pretty_timedelta(n): return str(n.days // 365) + 'y' -def profile_method(skip_attribute=False): +def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator for methods of the Module class. Logs the name of the given function f with the time it takes to execute it. """ - def outer(f): + def outer(f: Callable[..., T]) -> Callable[..., T]: @wraps(f) - def wrapper(*args, **kwargs): + def wrapper(*args: Any, **kwargs: Any) -> T: self = args[0] t = time.time() self.log.debug('Starting method {}.'.format(f.__name__)) diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 020930e27390..5ea77228c623 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -55,19 +55,19 @@ deps = mypy==0.790 commands = mypy --config-file=../../mypy.ini \ - cephadm/module.py \ - mgr_module.py \ - dashboard/module.py \ - prometheus/module.py \ - mgr_util.py \ - orchestrator/__init__.py \ - progress/module.py \ - rook/module.py \ - snap_schedule/module.py \ - stats/module.py \ - test_orchestrator/module.py \ - mds_autoscaler/module.py \ - volumes/__init__.py + -m cephadm \ + -m dashboard \ + -m mds_autoscaler \ + -m mgr_module \ + -m mgr_util \ + -m orchestrator \ + -m progress \ + -m prometheus \ + -m rook \ + -m snap_schedule \ + -m stats \ + -m test_orchestrator \ + -m volumes [testenv:test] setenv = {[testenv]setenv} diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 9f82fa550ddb..af7ebaaa5130 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -1,6 +1,7 @@ import json import errno import logging +from typing import TYPE_CHECKING import cephfs @@ -20,6 +21,9 @@ from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin from .operations.template import SubvolumeOpType +if TYPE_CHECKING: + from volumes import Module + log = logging.getLogger(__name__) ALLOWED_ACCESS_LEVELS = ('r', 'rw') @@ -42,7 +46,7 @@ def name_to_json(names): return json.dumps(namedict, indent=4, sort_keys=True) -class VolumeClient(CephfsClient): +class VolumeClient(CephfsClient["Module"]): def __init__(self, mgr): super().__init__(mgr) # volume specification -- 2.47.3