From a0930a63106e9f0d556e18bd70ffca6566388742 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 28 Jan 2021 12:44:07 +0100 Subject: [PATCH] mgr/rook: disallow_untyped_defs = True Signed-off-by: Sebastian Wagner --- src/mypy.ini | 8 ++ src/pybind/mgr/rook/module.py | 104 +++++++++++++++---------- src/pybind/mgr/rook/rook_cluster.py | 114 ++++++++++++++++------------ 3 files changed, 137 insertions(+), 89 deletions(-) diff --git a/src/mypy.ini b/src/mypy.ini index 8bd3a1d6c263f..61b930737d01b 100755 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -33,9 +33,17 @@ disallow_untyped_defs = True [mypy-iostat.*] disallow_untyped_defs = True +>>>>>>> 514d3b6785... mgr/rook: disallow_untyped_defs = True [mypy-orchestrator.*] disallow_untyped_defs = True +[mypy-rook.*] +disallow_untyped_defs = True + +# external import +[mypy-rook.rook_client.*] +disallow_untyped_defs = False + [mypy-snap_schedule.*] disallow_untyped_defs = True diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index c6a6293b4d5bb..becbe8ef41610 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -7,8 +7,9 @@ import json from ceph.deployment import inventory from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec +from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple + try: - from typing import List, Dict, Optional, Callable, Any from ceph.deployment.drive_group import DriveGroupSpec except ImportError: pass # just for type checking @@ -21,7 +22,7 @@ try: # https://github.com/kubernetes-client/python/issues/895 from kubernetes.client.models.v1_container_image import V1ContainerImage - def names(self, names): + def names(self: Any, names: Any) -> None: self._names = names V1ContainerImage.names = V1ContainerImage.names.setter(names) @@ -35,32 +36,39 @@ import orchestrator from .rook_cluster import RookCluster +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable) +ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec) + + -class RookCompletion(orchestrator.Completion): - def evaluate(self): +class RookCompletion(orchestrator.Completion[T]): + def evaluate(self) -> None: self.finalize(None) def deferred_read(f): - # type: (Callable) -> Callable[..., RookCompletion] + # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]] + + # See https://stackoverflow.com/questions/65936408/typing-function-when-decorator-change-generic-return-type """ Decorator to make RookOrchestrator methods return a completion object that executes themselves. """ @functools.wraps(f) - def wrapper(*args, **kwargs): + def wrapper(*args: Any, **kwargs: Any) -> RookCompletion[T]: return RookCompletion(on_complete=lambda _: f(*args, **kwargs)) return wrapper -def write_completion(on_complete, # type: Callable +def write_completion(on_complete, # type: Callable[[], T] message, # type: str - mgr, + mgr: 'RookOrchestrator', calc_percent=None # type: Optional[Callable[[], RookCompletion]] ): - # type: (...) -> RookCompletion + # type: (...) -> RookCompletion[T] return RookCompletion.with_progress( message=message, mgr=mgr, @@ -70,7 +78,7 @@ def write_completion(on_complete, # type: Callable class RookEnv(object): - def __init__(self): + def __init__(self) -> None: # POD_NAMESPACE already exist for Rook 0.9 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph') @@ -81,10 +89,10 @@ class RookEnv(object): self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1') self.api_name = "ceph.rook.io/" + self.crd_version - def api_version_match(self): + def api_version_match(self) -> bool: return self.crd_version == 'v1' - def has_namespace(self): + def has_namespace(self) -> bool: return 'POD_NAMESPACE' in os.environ @@ -110,14 +118,14 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): p.evaluate() @staticmethod - def can_run(): + def can_run() -> Tuple[bool, str]: if not kubernetes_imported: return False, "`kubernetes` python module not found" if not RookEnv().api_version_match(): return False, "Rook version unsupported." return True, '' - def available(self): + def available(self) -> Tuple[bool, str]: if not kubernetes_imported: return False, "`kubernetes` python module not found" elif not self._rook_env.has_namespace(): @@ -130,20 +138,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): else: return True, "" - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super(RookOrchestrator, self).__init__(*args, **kwargs) self._initialized = threading.Event() - self._k8s_CoreV1_api = None - self._k8s_BatchV1_api = None - self._rook_cluster = None + self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None + self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None + self._rook_cluster: Optional[RookCluster] = None self._rook_env = RookEnv() self._shutdown = threading.Event() self.all_progress_references = list() # type: List[orchestrator.ProgressReference] - def shutdown(self): + def shutdown(self) -> None: self._shutdown.set() @property @@ -160,7 +168,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): assert self._rook_cluster is not None return self._rook_cluster - def serve(self): + def serve(self) -> None: # For deployed clusters, we should always be running inside # a Rook cluster. For development convenience, also support # running outside (reading ~/.kube config) @@ -206,13 +214,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._shutdown.wait(5) - def cancel_completions(self): + def cancel_completions(self) -> None: for p in self.all_progress_references: p.fail() self.all_progress_references.clear() @deferred_read - def get_inventory(self, host_filter=None, refresh=False): + def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: host_list = None if host_filter and host_filter.hosts: # Explicit host list @@ -222,10 +230,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): # it into RookCluster.get_discovered_devices raise NotImplementedError() - devs = self.rook_cluster.get_discovered_devices(host_list) + discovered_devs = self.rook_cluster.get_discovered_devices(host_list) result = [] - for host_name, host_devs in devs.items(): + for host_name, host_devs in discovered_devs.items(): devs = [] for d in host_devs: if 'cephVolumeData' in d and d['cephVolumeData']: @@ -251,8 +259,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()] @deferred_read - def describe_service(self, service_type=None, service_name=None, - refresh=False): + def describe_service(self, + service_type: Optional[str] = None, + service_name: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.ServiceDescription]: now = datetime.datetime.utcnow() # CephCluster @@ -384,24 +394,32 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): service.container_image_id = dd.container_image_id if not service.container_image_name: service.container_image_name = dd.container_image_name - if not service.last_refresh or not dd.last_refresh or dd.last_refresh < service.last_refresh: + if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh: service.last_refresh = dd.last_refresh - if not service.created or dd.created < service.created: + if service.created is None or dd.created is None or dd.created < service.created: service.created = dd.created return [v for k, v in spec.items()] @deferred_read - def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, - refresh=False): + def list_daemons(self, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + host: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.DaemonDescription]: return self._list_daemons(service_name=service_name, daemon_type=daemon_type, daemon_id=daemon_id, host=host, refresh=refresh) - def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, - refresh=False): + def _list_daemons(self, + service_name: Optional[str] = None, + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + host: Optional[str] = None, + refresh: bool = False) -> List[orchestrator.DaemonDescription]: pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host) self.log.debug('pods %s' % pods) result = [] @@ -440,21 +458,21 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return result - def _service_add_decorate(self, typename, spec, func): + def _service_add_decorate(self, typename: str, spec: ServiceSpecT, func: Callable[[ServiceSpecT], T]) -> RookCompletion[T]: return write_completion( on_complete=lambda : func(spec), message="Creating {} services for {}".format(typename, spec.service_id), mgr=self ) - def _service_rm_decorate(self, typename, name, func): + def _service_rm_decorate(self, typename: str, name: str, func: Callable[[], T]) -> RookCompletion[T]: return write_completion( on_complete=lambda : func(), message="Removing {} services for {}".format(typename, name), mgr=self ) - def remove_service(self, service_name): + def remove_service(self, service_name: str) -> RookCompletion[str]: service_type, service_name = service_name.split('.', 1) if service_type == 'mds': return self._service_rm_decorate( @@ -469,9 +487,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return self._service_rm_decorate( 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name) ) + else: + raise orchestrator.OrchestratorError(f'Service type {service_type} not supported') def apply_mon(self, spec): - # type: (ServiceSpec) -> RookCompletion + # type: (ServiceSpec) -> RookCompletion[str] if spec.placement.hosts or spec.placement.label: raise RuntimeError("Host list or label is not supported by rook.") @@ -482,21 +502,21 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ) def apply_mds(self, spec): - # type: (ServiceSpec) -> RookCompletion + # type: (ServiceSpec) -> RookCompletion[str] return self._service_add_decorate('MDS', spec, self.rook_cluster.apply_filesystem) def apply_rgw(self, spec): - # type: (RGWSpec) -> RookCompletion + # type: (RGWSpec) -> RookCompletion[str] return self._service_add_decorate('RGW', spec, self.rook_cluster.apply_objectstore) def apply_nfs(self, spec): - # type: (NFSServiceSpec) -> RookCompletion + # type: (NFSServiceSpec) -> RookCompletion[str] return self._service_add_decorate("NFS", spec, self.rook_cluster.apply_nfsgw) - def remove_daemons(self, names): + def remove_daemons(self, names: List[str]) -> RookCompletion[List[str]]: return write_completion( lambda: self.rook_cluster.remove_pods(names), "Removing daemons {}".format(','.join(names)), @@ -504,7 +524,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ) def create_osds(self, drive_group): - # type: (DriveGroupSpec) -> RookCompletion + # type: (DriveGroupSpec) -> RookCompletion[str] """ Creates OSDs from a drive group specification. $: ceph orch osd create -i @@ -544,7 +564,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ) @deferred_read - def has_osds(matching_hosts): + def has_osds(matching_hosts: List[str]) -> bool: # Find OSD pods on this host pod_osd_ids = set() diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index d40613090b5a6..75e3fb9129bea 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -21,13 +21,11 @@ from urllib.parse import urljoin from urllib3.exceptions import ProtocolError from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec +from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec from mgr_util import merge_dicts -try: - from typing import Optional -except ImportError: - pass # just for type annotations +from typing import Optional, TypeVar, List, Callable, Any, cast, Generic, \ + Iterable, Dict, Iterator, Type try: from kubernetes import client, watch @@ -40,6 +38,7 @@ from .rook_client.ceph import cephfilesystem as cfs from .rook_client.ceph import cephnfs as cnfs from .rook_client.ceph import cephobjectstore as cos from .rook_client.ceph import cephcluster as ccl +from .rook_client._helper import CrdClass import orchestrator @@ -47,14 +46,20 @@ import orchestrator try: from rook.module import RookEnv - from typing import List, Dict except ImportError: pass # just used for type checking. + +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable) + +CrdClassT = TypeVar('CrdClassT', bound=CrdClass) + + log = logging.getLogger(__name__) -def _urllib3_supports_read_chunked(): +def __urllib3_supports_read_chunked() -> bool: # There is a bug in CentOS 7 as it ships a urllib3 which is lower # than required by kubernetes-client try: @@ -64,7 +69,7 @@ def _urllib3_supports_read_chunked(): return False -_urllib3_supports_read_chunked = _urllib3_supports_read_chunked() +_urllib3_supports_read_chunked = __urllib3_supports_read_chunked() class ApplyException(orchestrator.OrchestratorError): """ @@ -74,17 +79,17 @@ class ApplyException(orchestrator.OrchestratorError): """ -def threaded(f): - def wrapper(*args, **kwargs): +def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]: + def wrapper(*args: Any, **kwargs: Any) -> threading.Thread: t = threading.Thread(target=f, args=args, kwargs=kwargs) t.start() return t - return wrapper + return cast(Callable[..., threading.Thread], wrapper) -class KubernetesResource(object): - def __init__(self, api_func, **kwargs): +class KubernetesResource(Generic[T]): + def __init__(self, api_func: Callable, **kwargs: Any) -> None: """ Generic kubernetes Resource parent class @@ -99,13 +104,13 @@ class KubernetesResource(object): self.api_func = api_func # ``_items`` is accessed by different threads. I assume assignment is atomic. - self._items = dict() + self._items: Dict[str, T] = dict() self.thread = None # type: Optional[threading.Thread] - self.exception = None + self.exception: Optional[Exception] = None if not _urllib3_supports_read_chunked: logging.info('urllib3 is too old. Fallback to full fetches') - def _fetch(self): + def _fetch(self) -> str: """ Execute the requested api method as a one-off fetch""" response = self.api_func(**self.kwargs) # metadata is a client.V1ListMeta object type @@ -115,7 +120,7 @@ class KubernetesResource(object): return metadata.resource_version @property - def items(self): + def items(self) -> Iterable[T]: """ Returns the items of the request. Creates the watcher as a side effect. @@ -135,7 +140,7 @@ class KubernetesResource(object): return self._items.values() @threaded - def _watch(self, res_ver): + def _watch(self, res_ver: Optional[str]) -> None: """ worker thread that runs the kubernetes watch """ self.exception = None @@ -183,29 +188,31 @@ class KubernetesResource(object): class RookCluster(object): - def __init__(self, coreV1_api, batchV1_api, rook_env): + # import of client.CoreV1Api must be optional at import time. + # Instead allow mgr/rook to be imported anyway. + def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'): self.rook_env = rook_env # type: RookEnv self.coreV1_api = coreV1_api # client.CoreV1Api self.batchV1_api = batchV1_api # TODO: replace direct k8s calls with Rook API calls # when they're implemented - self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map, + self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map, namespace=self.rook_env.operator_namespace, label_selector="app=rook-discover") - self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod, + self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace=self.rook_env.namespace, label_selector="rook_cluster={0}".format( self.rook_env.namespace)) - self.nodes = KubernetesResource(self.coreV1_api.list_node) + self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node) - def rook_url(self, path): + def rook_url(self, path: str) -> str: prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( self.rook_env.crd_version, self.rook_env.namespace) return urljoin(prefix, path) - def rook_api_call(self, verb, path, **kwargs): + def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any: full_path = self.rook_url(path) log.debug("[%s] %s" % (verb, full_path)) @@ -218,22 +225,22 @@ class RookCluster(object): _preload_content=True, **kwargs) - def rook_api_get(self, path, **kwargs): + def rook_api_get(self, path: str, **kwargs: Any) -> Any: return self.rook_api_call("GET", path, **kwargs) - def rook_api_delete(self, path): + def rook_api_delete(self, path: str) -> Any: return self.rook_api_call("DELETE", path) - def rook_api_patch(self, path, **kwargs): + def rook_api_patch(self, path: str, **kwargs: Any) -> Any: return self.rook_api_call("PATCH", path, header_params={"Content-Type": "application/json-patch+json"}, **kwargs) - def rook_api_post(self, path, **kwargs): + def rook_api_post(self, path: str, **kwargs: Any) -> Any: return self.rook_api_call("POST", path, **kwargs) - def get_discovered_devices(self, nodenames=None): - def predicate(item): + def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]: + def predicate(item: client.V1ConfigMapList) -> bool: if nodenames is not None: return item.metadata.labels['rook.io/node'] in nodenames else: @@ -252,7 +259,7 @@ class RookCluster(object): return nodename_to_devices - def get_nfs_conf_url(self, nfs_cluster, instance): + def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]: # # Fetch cephnfs object for "nfs_cluster" and then return a rados:// # URL for the instance within that cluster. If the fetch fails, just @@ -273,7 +280,10 @@ class RookCluster(object): url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance) return url - def describe_pods(self, service_type, service_id, nodename): + def describe_pods(self, + service_type: Optional[str], + service_id: Optional[str], + nodename: Optional[str]) -> List[Dict[str, Any]]: """ Go query the k8s API about deployment, containers related to this filesystem @@ -358,9 +368,8 @@ class RookCluster(object): return pods_summary - def remove_pods(self, names): + def remove_pods(self, names: List[str]) -> List[str]: pods = [i for i in self.rook_pods.items] - num = 0 for p in pods: d = p.to_dict() daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','') @@ -372,14 +381,13 @@ class RookCluster(object): self.rook_env.namespace, body=client.V1DeleteOptions() ) - num += 1 - return "Removed %d pods" % num + return [f'Removed Pod {n}' for n in names] - def get_node_names(self): + def get_node_names(self) -> List[str]: return [i.metadata.name for i in self.nodes.items] @contextmanager - def ignore_409(self, what): + def ignore_409(self, what: str) -> Iterator[None]: try: yield except ApiException as e: @@ -389,7 +397,7 @@ class RookCluster(object): else: raise - def apply_filesystem(self, spec: ServiceSpec) -> None: + def apply_filesystem(self, spec: ServiceSpec) -> str: # TODO use spec.placement # TODO warn if spec.extended has entries we don't kow how # to action. @@ -411,15 +419,17 @@ class RookCluster(object): ) ) ) + assert spec.service_id is not None return self._create_or_patch( cfs.CephFilesystem, 'cephfilesystems', spec.service_id, _update_fs, _create_fs) - def apply_objectstore(self, spec): + def apply_objectstore(self, spec: RGWSpec) -> str: # FIXME: service_id is $realm.$zone, but rook uses realm # $crname and zone $crname. The '.' will confuse kubernetes. # For now, assert that realm==zone. + assert spec.service_id is not None (realm, zone) = spec.service_id.split('.', 1) assert realm == zone assert spec.subcluster is None @@ -456,7 +466,7 @@ class RookCluster(object): cos.CephObjectStore, 'cephobjectstores', name, _update_zone, _create_zone) - def apply_nfsgw(self, spec: NFSServiceSpec) -> None: + def apply_nfsgw(self, spec: NFSServiceSpec) -> str: # TODO use spec.placement # TODO warn if spec.extended has entries we don't kow how # to action. @@ -489,10 +499,11 @@ class RookCluster(object): return rook_nfsgw + assert spec.service_id is not None return self._create_or_patch(cnfs.CephNFS, 'cephnfses', spec.service_id, _update_nfs, _create_nfs) - def rm_service(self, rooktype, service_id): + def rm_service(self, rooktype: str, service_id: str) -> str: objpath = "{0}/{1}".format(rooktype, service_id) @@ -505,7 +516,9 @@ class RookCluster(object): else: raise - def can_create_osd(self): + return f'Removed {objpath}' + + def can_create_osd(self) -> bool: current_cluster = self.rook_api_get( "cephclusters/{0}".format(self.rook_env.cluster_name)) use_all_nodes = current_cluster['spec'].get('useAllNodes', False) @@ -514,12 +527,14 @@ class RookCluster(object): # to anything we put in 'nodes', so can't do OSD creation. return not use_all_nodes - def node_exists(self, node_name): + def node_exists(self, node_name: str) -> bool: return node_name in self.get_node_names() - def update_mon_count(self, newcount): + def update_mon_count(self, newcount: Optional[int]) -> str: def _update_mon_count(current, new): # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + if newcount is None: + raise orchestrator.OrchestratorError('unable to set mon count to None') new.spec.mon.count = newcount return new return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) @@ -588,7 +603,7 @@ class RookCluster(object): return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds) - def _patch(self, crd, crd_name, cr_name, func): + def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str: current_json = self.rook_api_get( "{}/{}".format(crd_name, cr_name) ) @@ -616,7 +631,12 @@ class RookCluster(object): return "Success" - def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func): + def _create_or_patch(self, + crd: Type, + crd_name: str, + cr_name: str, + update_func: Callable[[CrdClassT], CrdClassT], + create_func: Callable[[], CrdClassT]) -> str: try: current_json = self.rook_api_get( "{}/{}".format(crd_name, cr_name) -- 2.39.5