]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: disallow_untyped_defs = True
authorSebastian Wagner <sebastian.wagner@suse.com>
Thu, 28 Jan 2021 11:44:07 +0000 (12:44 +0100)
committerSebastian Wagner <sewagner@redhat.com>
Thu, 17 Jun 2021 11:05:30 +0000 (13:05 +0200)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit a0930a63106e9f0d556e18bd70ffca6566388742)

src/mypy.ini
src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index f50ca2ee7e3fc528c5b4aee3614989414ef73049..ce8a36ee6a7944bb837cf7c61e181498e55a9b8c 100755 (executable)
@@ -40,6 +40,13 @@ disallow_untyped_defs = True
 [mypy-prometheus.*]
 disallow_untyped_defs = True
 
+[mypy-rook.*]
+disallow_untyped_defs = True
+
+# external import
+[mypy-rook.rook_client.*]
+disallow_untyped_defs = False
+
 # Make cephadm and rook happy
 [mypy-OpenSSL]
 ignore_missing_imports = True
index cd9f578a180475f9bf47c6b88047bb5c7e624b9b..bac62bfc355c65ad73953894937819ea17497acf 100644 (file)
@@ -7,8 +7,9 @@ import json
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
 
+from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
+
 try:
-    from typing import List, Dict, Optional, Callable, Any, Tuple
     from ceph.deployment.drive_group import DriveGroupSpec
 except ImportError:
     pass  # just for type checking
@@ -21,7 +22,7 @@ try:
 
     # https://github.com/kubernetes-client/python/issues/895
     from kubernetes.client.models.v1_container_image import V1ContainerImage
-    def names(self, names):
+    def names(self: Any, names: Any) -> None:
         self._names = names
     V1ContainerImage.names = V1ContainerImage.names.setter(names)
 
@@ -36,11 +37,14 @@ from orchestrator import handle_orch_error, OrchResult, raise_if_exception
 
 from .rook_cluster import RookCluster
 
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
 
 
 
 class RookEnv(object):
-    def __init__(self):
+    def __init__(self) -> None:
         # POD_NAMESPACE already exist for Rook 0.9
         self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
 
@@ -51,10 +55,10 @@ class RookEnv(object):
         self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
         self.api_name = "ceph.rook.io/" + self.crd_version
 
-    def api_version_match(self):
+    def api_version_match(self) -> bool:
         return self.crd_version == 'v1'
 
-    def has_namespace(self):
+    def has_namespace(self) -> bool:
         return 'POD_NAMESPACE' in os.environ
 
 
@@ -73,7 +77,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
     ]
 
     @staticmethod
-    def can_run():
+    def can_run() -> Tuple[bool, str]:
         if not kubernetes_imported:
             return False, "`kubernetes` python module not found"
         if not RookEnv().api_version_match():
@@ -93,13 +97,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         else:
             return True, "", {}
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(RookOrchestrator, self).__init__(*args, **kwargs)
 
         self._initialized = threading.Event()
-        self._k8s_CoreV1_api = None
-        self._k8s_BatchV1_api = None
-        self._rook_cluster = None
+        self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None
+        self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None
+        self._rook_cluster: Optional[RookCluster] = None
         self._rook_env = RookEnv()
 
         self._shutdown = threading.Event()
@@ -121,7 +125,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         assert self._rook_cluster is not None
         return self._rook_cluster
 
-    def serve(self):
+    def serve(self) -> None:
         # For deployed clusters, we should always be running inside
         # a Rook cluster.  For development convenience, also support
         # running outside (reading ~/.kube config)
@@ -160,7 +164,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             self._shutdown.wait(5)
 
     @handle_orch_error
-    def get_inventory(self, host_filter=None, refresh=False):
+    def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
         host_list = None
         if host_filter and host_filter.hosts:
             # Explicit host list
@@ -170,10 +174,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             # it into RookCluster.get_discovered_devices
             raise NotImplementedError()
 
-        devs = self.rook_cluster.get_discovered_devices(host_list)
+        discovered_devs = self.rook_cluster.get_discovered_devices(host_list)
 
         result = []
-        for host_name, host_devs in devs.items():
+        for host_name, host_devs in discovered_devs.items():
             devs = []
             for d in host_devs:
                 if 'cephVolumeData' in d and d['cephVolumeData']:
@@ -199,8 +203,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
 
     @handle_orch_error
-    def describe_service(self, service_type=None, service_name=None,
-                         refresh=False):
+    def describe_service(self,
+                         service_type: Optional[str] = None,
+                         service_name: Optional[str] = None,
+                         refresh: bool = False) -> List[orchestrator.ServiceDescription]:
         now = datetime.datetime.utcnow()
 
         # CephCluster
@@ -332,24 +338,32 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 service.container_image_id = dd.container_image_id
             if not service.container_image_name:
                 service.container_image_name = dd.container_image_name
-            if not service.last_refresh or not dd.last_refresh or dd.last_refresh < service.last_refresh:
+            if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh:
                 service.last_refresh = dd.last_refresh
-            if not service.created or dd.created < service.created:
+            if service.created is None or dd.created is None or dd.created < service.created:
                 service.created = dd.created
 
         return [v for k, v in spec.items()]
 
     @handle_orch_error
-    def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
-                     refresh=False):
+    def list_daemons(self,
+                     service_name: Optional[str] = None,
+                     daemon_type: Optional[str] = None,
+                     daemon_id: Optional[str] = None,
+                     host: Optional[str] = None,
+                     refresh: bool = False) -> List[orchestrator.DaemonDescription]:
         return self._list_daemons(service_name=service_name,
                                   daemon_type=daemon_type,
                                   daemon_id=daemon_id,
                                   host=host,
                                   refresh=refresh)
 
-    def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
-                      refresh=False):
+    def _list_daemons(self,
+                      service_name: Optional[str] = None,
+                      daemon_type: Optional[str] = None,
+                      daemon_id: Optional[str] = None,
+                      host: Optional[str] = None,
+                      refresh: bool = False) -> List[orchestrator.DaemonDescription]:
         pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
         self.log.debug('pods %s' % pods)
         result = []
index 7db2f38ceb6cab058c03d8d5f7a9e822a721bf0b..d073146d1c85cbd5416413b20f966688a1e2c536 100644 (file)
@@ -21,13 +21,11 @@ from urllib.parse import urljoin
 from urllib3.exceptions import ProtocolError
 
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec
 from mgr_util import merge_dicts
 
-try:
-    from typing import Optional
-except ImportError:
-    pass  # just for type annotations
+from typing import Optional, TypeVar, List, Callable, Any, cast, Generic, \
+    Iterable, Dict, Iterator, Type
 
 try:
     from kubernetes import client, watch
@@ -40,6 +38,7 @@ from .rook_client.ceph import cephfilesystem as cfs
 from .rook_client.ceph import cephnfs as cnfs
 from .rook_client.ceph import cephobjectstore as cos
 from .rook_client.ceph import cephcluster as ccl
+from .rook_client._helper import CrdClass
 
 
 import orchestrator
@@ -47,14 +46,20 @@ import orchestrator
 
 try:
     from rook.module import RookEnv
-    from typing import List, Dict
 except ImportError:
     pass  # just used for type checking.
 
+
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+
+CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
+
+
 log = logging.getLogger(__name__)
 
 
-def _urllib3_supports_read_chunked():
+def __urllib3_supports_read_chunked() -> bool:
     # There is a bug in CentOS 7 as it ships a urllib3 which is lower
     # than required by kubernetes-client
     try:
@@ -64,7 +69,7 @@ def _urllib3_supports_read_chunked():
         return False
 
 
-_urllib3_supports_read_chunked = _urllib3_supports_read_chunked()
+_urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
 
 class ApplyException(orchestrator.OrchestratorError):
     """
@@ -74,17 +79,17 @@ class ApplyException(orchestrator.OrchestratorError):
     """
 
 
-def threaded(f):
-    def wrapper(*args, **kwargs):
+def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
+    def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
         t = threading.Thread(target=f, args=args, kwargs=kwargs)
         t.start()
         return t
 
-    return wrapper
+    return cast(Callable[..., threading.Thread], wrapper)
 
 
-class KubernetesResource(object):
-    def __init__(self, api_func, **kwargs):
+class KubernetesResource(Generic[T]):
+    def __init__(self, api_func: Callable, **kwargs: Any) -> None:
         """
         Generic kubernetes Resource parent class
 
@@ -99,13 +104,13 @@ class KubernetesResource(object):
         self.api_func = api_func
 
         # ``_items`` is accessed by different threads. I assume assignment is atomic.
-        self._items = dict()
+        self._items: Dict[str, T] = dict()
         self.thread = None  # type: Optional[threading.Thread]
-        self.exception = None
+        self.exception: Optional[Exception] = None
         if not _urllib3_supports_read_chunked:
             logging.info('urllib3 is too old. Fallback to full fetches')
 
-    def _fetch(self):
+    def _fetch(self) -> str:
         """ Execute the requested api method as a one-off fetch"""
         response = self.api_func(**self.kwargs)
         # metadata is a client.V1ListMeta object type
@@ -115,7 +120,7 @@ class KubernetesResource(object):
         return metadata.resource_version
 
     @property
-    def items(self):
+    def items(self) -> Iterable[T]:
         """
         Returns the items of the request.
         Creates the watcher as a side effect.
@@ -135,7 +140,7 @@ class KubernetesResource(object):
         return self._items.values()
 
     @threaded
-    def _watch(self, res_ver):
+    def _watch(self, res_ver: Optional[str]) -> None:
         """ worker thread that runs the kubernetes watch """
 
         self.exception = None
@@ -183,29 +188,31 @@ class KubernetesResource(object):
 
 
 class RookCluster(object):
-    def __init__(self, coreV1_api, batchV1_api, rook_env):
+    # import of client.CoreV1Api must be optional at import time.
+    # Instead allow mgr/rook to be imported anyway.
+    def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'):
         self.rook_env = rook_env  # type: RookEnv
         self.coreV1_api = coreV1_api  # client.CoreV1Api
         self.batchV1_api = batchV1_api
 
         #  TODO: replace direct k8s calls with Rook API calls
         # when they're implemented
-        self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+        self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
                                                  namespace=self.rook_env.operator_namespace,
                                                  label_selector="app=rook-discover")
 
-        self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+        self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
                                             namespace=self.rook_env.namespace,
                                             label_selector="rook_cluster={0}".format(
                                                 self.rook_env.namespace))
-        self.nodes = KubernetesResource(self.coreV1_api.list_node)
+        self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
 
-    def rook_url(self, path):
+    def rook_url(self, path: str) -> str:
         prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
             self.rook_env.crd_version, self.rook_env.namespace)
         return urljoin(prefix, path)
 
-    def rook_api_call(self, verb, path, **kwargs):
+    def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
         full_path = self.rook_url(path)
         log.debug("[%s] %s" % (verb, full_path))
 
@@ -218,22 +225,22 @@ class RookCluster(object):
             _preload_content=True,
             **kwargs)
 
-    def rook_api_get(self, path, **kwargs):
+    def rook_api_get(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("GET", path, **kwargs)
 
-    def rook_api_delete(self, path):
+    def rook_api_delete(self, path: str) -> Any:
         return self.rook_api_call("DELETE", path)
 
-    def rook_api_patch(self, path, **kwargs):
+    def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("PATCH", path,
                                   header_params={"Content-Type": "application/json-patch+json"},
                                   **kwargs)
 
-    def rook_api_post(self, path, **kwargs):
+    def rook_api_post(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("POST", path, **kwargs)
 
-    def get_discovered_devices(self, nodenames=None):
-        def predicate(item):
+    def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]:
+        def predicate(item: client.V1ConfigMapList) -> bool:
             if nodenames is not None:
                 return item.metadata.labels['rook.io/node'] in nodenames
             else:
@@ -252,7 +259,7 @@ class RookCluster(object):
 
         return nodename_to_devices
 
-    def get_nfs_conf_url(self, nfs_cluster, instance):
+    def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
         #
         # Fetch cephnfs object for "nfs_cluster" and then return a rados://
         # URL for the instance within that cluster. If the fetch fails, just
@@ -273,7 +280,10 @@ class RookCluster(object):
             url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
         return url
 
-    def describe_pods(self, service_type, service_id, nodename):
+    def describe_pods(self,
+                      service_type: Optional[str],
+                      service_id: Optional[str],
+                      nodename: Optional[str]) -> List[Dict[str, Any]]:
         """
         Go query the k8s API about deployment, containers related to this
         filesystem
@@ -358,9 +368,8 @@ class RookCluster(object):
 
         return pods_summary
 
-    def remove_pods(self, names):
+    def remove_pods(self, names: List[str]) -> List[str]:
         pods = [i for i in self.rook_pods.items]
-        num = 0
         for p in pods:
             d = p.to_dict()
             daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
@@ -372,14 +381,13 @@ class RookCluster(object):
                     self.rook_env.namespace,
                     body=client.V1DeleteOptions()
                 )
-                num += 1
-        return "Removed %d pods" % num
+        return [f'Removed Pod {n}' for n in names]
 
-    def get_node_names(self):
+    def get_node_names(self) -> List[str]:
         return [i.metadata.name for i in self.nodes.items]
 
     @contextmanager
-    def ignore_409(self, what):
+    def ignore_409(self, what: str) -> Iterator[None]:
         try:
             yield
         except ApiException as e:
@@ -411,11 +419,12 @@ class RookCluster(object):
                     )
                 )
             )
+        assert spec.service_id is not None
         return self._create_or_patch(
             cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
             _update_fs, _create_fs)
 
-    def apply_objectstore(self, spec):
+    def apply_objectstore(self, spec: RGWSpec) -> str:
         assert spec.service_id is not None
 
         name = spec.service_id
@@ -494,10 +503,11 @@ class RookCluster(object):
 
             return rook_nfsgw
 
+        assert spec.service_id is not None
         return self._create_or_patch(cnfs.CephNFS, 'cephnfses', spec.service_id,
                 _update_nfs, _create_nfs)
 
-    def rm_service(self, rooktype, service_id):
+    def rm_service(self, rooktype: str, service_id: str) -> str:
 
         objpath = "{0}/{1}".format(rooktype, service_id)
 
@@ -510,7 +520,9 @@ class RookCluster(object):
             else:
                 raise
 
-    def can_create_osd(self):
+        return f'Removed {objpath}'
+
+    def can_create_osd(self) -> bool:
         current_cluster = self.rook_api_get(
             "cephclusters/{0}".format(self.rook_env.cluster_name))
         use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
@@ -519,12 +531,14 @@ class RookCluster(object):
         # to anything we put in 'nodes', so can't do OSD creation.
         return not use_all_nodes
 
-    def node_exists(self, node_name):
+    def node_exists(self, node_name: str) -> bool:
         return node_name in self.get_node_names()
 
-    def update_mon_count(self, newcount):
+    def update_mon_count(self, newcount: Optional[int]) -> str:
         def _update_mon_count(current, new):
             # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            if newcount is None:
+                raise orchestrator.OrchestratorError('unable to set mon count to None')
             new.spec.mon.count = newcount
             return new
         return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
@@ -593,7 +607,7 @@ class RookCluster(object):
 
         return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
 
-    def _patch(self, crd, crd_name, cr_name, func):
+    def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
         current_json = self.rook_api_get(
             "{}/{}".format(crd_name, cr_name)
         )
@@ -621,7 +635,12 @@ class RookCluster(object):
 
         return "Success"
 
-    def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func):
+    def _create_or_patch(self,
+                         crd: Type,
+                         crd_name: str,
+                         cr_name: str,
+                         update_func: Callable[[CrdClassT], CrdClassT],
+                         create_func: Callable[[], CrdClassT]) -> str:
         try:
             current_json = self.rook_api_get(
                 "{}/{}".format(crd_name, cr_name)