]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: disallow_untyped_defs = True
authorSebastian Wagner <sebastian.wagner@suse.com>
Thu, 28 Jan 2021 11:44:07 +0000 (12:44 +0100)
committerKefu Chai <kchai@redhat.com>
Sun, 31 Jan 2021 03:18:58 +0000 (11:18 +0800)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/mypy.ini
src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index 8bd3a1d6c263f67214a88235e02c26324f7483c2..61b930737d01b84d59ba175e6313665263a71877 100755 (executable)
@@ -33,9 +33,17 @@ disallow_untyped_defs = True
 [mypy-iostat.*]
 disallow_untyped_defs = True
 
+>>>>>>> 514d3b6785... mgr/rook: disallow_untyped_defs = True
 [mypy-orchestrator.*]
 disallow_untyped_defs = True
 
+[mypy-rook.*]
+disallow_untyped_defs = True
+
+# external import
+[mypy-rook.rook_client.*]
+disallow_untyped_defs = False
+
 [mypy-snap_schedule.*]
 disallow_untyped_defs = True
 
index c6a6293b4d5bbd29531979187f29d1479826b588..becbe8ef416101b9348a5b7c5fa89820eea61bde 100644 (file)
@@ -7,8 +7,9 @@ import json
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
 
+from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
+
 try:
-    from typing import List, Dict, Optional, Callable, Any
     from ceph.deployment.drive_group import DriveGroupSpec
 except ImportError:
     pass  # just for type checking
@@ -21,7 +22,7 @@ try:
 
     # https://github.com/kubernetes-client/python/issues/895
     from kubernetes.client.models.v1_container_image import V1ContainerImage
-    def names(self, names):
+    def names(self: Any, names: Any) -> None:
         self._names = names
     V1ContainerImage.names = V1ContainerImage.names.setter(names)
 
@@ -35,32 +36,39 @@ import orchestrator
 
 from .rook_cluster import RookCluster
 
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
+
+
 
-class RookCompletion(orchestrator.Completion):
-    def evaluate(self):
+class RookCompletion(orchestrator.Completion[T]):
+    def evaluate(self) -> None:
         self.finalize(None)
 
 
 def deferred_read(f):
-    # type: (Callable) -> Callable[..., RookCompletion]
+    # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]]
+
+    # See https://stackoverflow.com/questions/65936408/typing-function-when-decorator-change-generic-return-type
     """
     Decorator to make RookOrchestrator methods return
     a completion object that executes themselves.
     """
 
     @functools.wraps(f)
-    def wrapper(*args, **kwargs):
+    def wrapper(*args: Any, **kwargs: Any) -> RookCompletion[T]:
         return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
 
     return wrapper
 
 
-def write_completion(on_complete,  # type: Callable
+def write_completion(on_complete,  # type: Callable[[], T]
                      message,  # type: str
-                     mgr,
+                     mgr: 'RookOrchestrator',
                      calc_percent=None  # type: Optional[Callable[[], RookCompletion]]
                      ):
-    # type: (...) -> RookCompletion
+    # type: (...) -> RookCompletion[T]
     return RookCompletion.with_progress(
         message=message,
         mgr=mgr,
@@ -70,7 +78,7 @@ def write_completion(on_complete,  # type: Callable
 
 
 class RookEnv(object):
-    def __init__(self):
+    def __init__(self) -> None:
         # POD_NAMESPACE already exist for Rook 0.9
         self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
 
@@ -81,10 +89,10 @@ class RookEnv(object):
         self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
         self.api_name = "ceph.rook.io/" + self.crd_version
 
-    def api_version_match(self):
+    def api_version_match(self) -> bool:
         return self.crd_version == 'v1'
 
-    def has_namespace(self):
+    def has_namespace(self) -> bool:
         return 'POD_NAMESPACE' in os.environ
 
 
@@ -110,14 +118,14 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 p.evaluate()
 
     @staticmethod
-    def can_run():
+    def can_run() -> Tuple[bool, str]:
         if not kubernetes_imported:
             return False, "`kubernetes` python module not found"
         if not RookEnv().api_version_match():
             return False, "Rook version unsupported."
         return True, ''
 
-    def available(self):
+    def available(self) -> Tuple[bool, str]:
         if not kubernetes_imported:
             return False, "`kubernetes` python module not found"
         elif not self._rook_env.has_namespace():
@@ -130,20 +138,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         else:
             return True, ""
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(RookOrchestrator, self).__init__(*args, **kwargs)
 
         self._initialized = threading.Event()
-        self._k8s_CoreV1_api = None
-        self._k8s_BatchV1_api = None
-        self._rook_cluster = None
+        self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None
+        self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None
+        self._rook_cluster: Optional[RookCluster] = None
         self._rook_env = RookEnv()
 
         self._shutdown = threading.Event()
 
         self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self._shutdown.set()
 
     @property
@@ -160,7 +168,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         assert self._rook_cluster is not None
         return self._rook_cluster
 
-    def serve(self):
+    def serve(self) -> None:
         # For deployed clusters, we should always be running inside
         # a Rook cluster.  For development convenience, also support
         # running outside (reading ~/.kube config)
@@ -206,13 +214,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             self._shutdown.wait(5)
 
-    def cancel_completions(self):
+    def cancel_completions(self) -> None:
         for p in self.all_progress_references:
             p.fail()
         self.all_progress_references.clear()
 
     @deferred_read
-    def get_inventory(self, host_filter=None, refresh=False):
+    def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
         host_list = None
         if host_filter and host_filter.hosts:
             # Explicit host list
@@ -222,10 +230,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             # it into RookCluster.get_discovered_devices
             raise NotImplementedError()
 
-        devs = self.rook_cluster.get_discovered_devices(host_list)
+        discovered_devs = self.rook_cluster.get_discovered_devices(host_list)
 
         result = []
-        for host_name, host_devs in devs.items():
+        for host_name, host_devs in discovered_devs.items():
             devs = []
             for d in host_devs:
                 if 'cephVolumeData' in d and d['cephVolumeData']:
@@ -251,8 +259,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
 
     @deferred_read
-    def describe_service(self, service_type=None, service_name=None,
-                         refresh=False):
+    def describe_service(self,
+                         service_type: Optional[str] = None,
+                         service_name: Optional[str] = None,
+                         refresh: bool = False) -> List[orchestrator.ServiceDescription]:
         now = datetime.datetime.utcnow()
 
         # CephCluster
@@ -384,24 +394,32 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 service.container_image_id = dd.container_image_id
             if not service.container_image_name:
                 service.container_image_name = dd.container_image_name
-            if not service.last_refresh or not dd.last_refresh or dd.last_refresh < service.last_refresh:
+            if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh:
                 service.last_refresh = dd.last_refresh
-            if not service.created or dd.created < service.created:
+            if service.created is None or dd.created is None or dd.created < service.created:
                 service.created = dd.created
 
         return [v for k, v in spec.items()]
 
     @deferred_read
-    def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
-                     refresh=False):
+    def list_daemons(self,
+                     service_name: Optional[str] = None,
+                     daemon_type: Optional[str] = None,
+                     daemon_id: Optional[str] = None,
+                     host: Optional[str] = None,
+                     refresh: bool = False) -> List[orchestrator.DaemonDescription]:
         return self._list_daemons(service_name=service_name,
                                   daemon_type=daemon_type,
                                   daemon_id=daemon_id,
                                   host=host,
                                   refresh=refresh)
 
-    def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
-                      refresh=False):
+    def _list_daemons(self,
+                      service_name: Optional[str] = None,
+                      daemon_type: Optional[str] = None,
+                      daemon_id: Optional[str] = None,
+                      host: Optional[str] = None,
+                      refresh: bool = False) -> List[orchestrator.DaemonDescription]:
         pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
         self.log.debug('pods %s' % pods)
         result = []
@@ -440,21 +458,21 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return result
 
-    def _service_add_decorate(self, typename, spec, func):
+    def _service_add_decorate(self, typename: str, spec: ServiceSpecT, func: Callable[[ServiceSpecT], T]) -> RookCompletion[T]:
         return write_completion(
             on_complete=lambda : func(spec),
             message="Creating {} services for {}".format(typename, spec.service_id),
             mgr=self
         )
 
-    def _service_rm_decorate(self, typename, name, func):
+    def _service_rm_decorate(self, typename: str, name: str, func: Callable[[], T]) -> RookCompletion[T]:
         return write_completion(
             on_complete=lambda : func(),
             message="Removing {} services for {}".format(typename, name),
             mgr=self
         )
 
-    def remove_service(self, service_name):
+    def remove_service(self, service_name: str) -> RookCompletion[str]:
         service_type, service_name = service_name.split('.', 1)
         if service_type == 'mds':
             return self._service_rm_decorate(
@@ -469,9 +487,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             return self._service_rm_decorate(
                 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
             )
+        else:
+            raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
 
     def apply_mon(self, spec):
-        # type: (ServiceSpec) -> RookCompletion
+        # type: (ServiceSpec) -> RookCompletion[str]
         if spec.placement.hosts or spec.placement.label:
             raise RuntimeError("Host list or label is not supported by rook.")
 
@@ -482,21 +502,21 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         )
 
     def apply_mds(self, spec):
-        # type: (ServiceSpec) -> RookCompletion
+        # type: (ServiceSpec) -> RookCompletion[str]
         return self._service_add_decorate('MDS', spec,
                                           self.rook_cluster.apply_filesystem)
 
     def apply_rgw(self, spec):
-        # type: (RGWSpec) -> RookCompletion
+        # type: (RGWSpec) -> RookCompletion[str]
         return self._service_add_decorate('RGW', spec,
                                           self.rook_cluster.apply_objectstore)
 
     def apply_nfs(self, spec):
-        # type: (NFSServiceSpec) -> RookCompletion
+        # type: (NFSServiceSpec) -> RookCompletion[str]
         return self._service_add_decorate("NFS", spec,
                                           self.rook_cluster.apply_nfsgw)
 
-    def remove_daemons(self, names):
+    def remove_daemons(self, names: List[str]) -> RookCompletion[List[str]]:
         return write_completion(
             lambda: self.rook_cluster.remove_pods(names),
             "Removing daemons {}".format(','.join(names)),
@@ -504,7 +524,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         )
 
     def create_osds(self, drive_group):
-        # type: (DriveGroupSpec) -> RookCompletion
+        # type: (DriveGroupSpec) -> RookCompletion[str]
         """ Creates OSDs from a drive group specification.
 
         $: ceph orch osd create -i <dg.file>
@@ -544,7 +564,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             )
 
         @deferred_read
-        def has_osds(matching_hosts):
+        def has_osds(matching_hosts: List[str]) -> bool:
 
             # Find OSD pods on this host
             pod_osd_ids = set()
index d40613090b5a6e1a67e775f393e22418273d65b7..75e3fb9129bea37b8aee3eef3788fdcdfb9092d0 100644 (file)
@@ -21,13 +21,11 @@ from urllib.parse import urljoin
 from urllib3.exceptions import ProtocolError
 
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec
 from mgr_util import merge_dicts
 
-try:
-    from typing import Optional
-except ImportError:
-    pass  # just for type annotations
+from typing import Optional, TypeVar, List, Callable, Any, cast, Generic, \
+    Iterable, Dict, Iterator, Type
 
 try:
     from kubernetes import client, watch
@@ -40,6 +38,7 @@ from .rook_client.ceph import cephfilesystem as cfs
 from .rook_client.ceph import cephnfs as cnfs
 from .rook_client.ceph import cephobjectstore as cos
 from .rook_client.ceph import cephcluster as ccl
+from .rook_client._helper import CrdClass
 
 
 import orchestrator
@@ -47,14 +46,20 @@ import orchestrator
 
 try:
     from rook.module import RookEnv
-    from typing import List, Dict
 except ImportError:
     pass  # just used for type checking.
 
+
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+
+CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
+
+
 log = logging.getLogger(__name__)
 
 
-def _urllib3_supports_read_chunked():
+def __urllib3_supports_read_chunked() -> bool:
     # There is a bug in CentOS 7 as it ships a urllib3 which is lower
     # than required by kubernetes-client
     try:
@@ -64,7 +69,7 @@ def _urllib3_supports_read_chunked():
         return False
 
 
-_urllib3_supports_read_chunked = _urllib3_supports_read_chunked()
+_urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
 
 class ApplyException(orchestrator.OrchestratorError):
     """
@@ -74,17 +79,17 @@ class ApplyException(orchestrator.OrchestratorError):
     """
 
 
-def threaded(f):
-    def wrapper(*args, **kwargs):
+def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
+    def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
         t = threading.Thread(target=f, args=args, kwargs=kwargs)
         t.start()
         return t
 
-    return wrapper
+    return cast(Callable[..., threading.Thread], wrapper)
 
 
-class KubernetesResource(object):
-    def __init__(self, api_func, **kwargs):
+class KubernetesResource(Generic[T]):
+    def __init__(self, api_func: Callable, **kwargs: Any) -> None:
         """
         Generic kubernetes Resource parent class
 
@@ -99,13 +104,13 @@ class KubernetesResource(object):
         self.api_func = api_func
 
         # ``_items`` is accessed by different threads. I assume assignment is atomic.
-        self._items = dict()
+        self._items: Dict[str, T] = dict()
         self.thread = None  # type: Optional[threading.Thread]
-        self.exception = None
+        self.exception: Optional[Exception] = None
         if not _urllib3_supports_read_chunked:
             logging.info('urllib3 is too old. Fallback to full fetches')
 
-    def _fetch(self):
+    def _fetch(self) -> str:
         """ Execute the requested api method as a one-off fetch"""
         response = self.api_func(**self.kwargs)
         # metadata is a client.V1ListMeta object type
@@ -115,7 +120,7 @@ class KubernetesResource(object):
         return metadata.resource_version
 
     @property
-    def items(self):
+    def items(self) -> Iterable[T]:
         """
         Returns the items of the request.
         Creates the watcher as a side effect.
@@ -135,7 +140,7 @@ class KubernetesResource(object):
         return self._items.values()
 
     @threaded
-    def _watch(self, res_ver):
+    def _watch(self, res_ver: Optional[str]) -> None:
         """ worker thread that runs the kubernetes watch """
 
         self.exception = None
@@ -183,29 +188,31 @@ class KubernetesResource(object):
 
 
 class RookCluster(object):
-    def __init__(self, coreV1_api, batchV1_api, rook_env):
+    # import of client.CoreV1Api must be optional at import time.
+    # Instead allow mgr/rook to be imported anyway.
+    def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'):
         self.rook_env = rook_env  # type: RookEnv
         self.coreV1_api = coreV1_api  # client.CoreV1Api
         self.batchV1_api = batchV1_api
 
         #  TODO: replace direct k8s calls with Rook API calls
         # when they're implemented
-        self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+        self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
                                                  namespace=self.rook_env.operator_namespace,
                                                  label_selector="app=rook-discover")
 
-        self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+        self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
                                             namespace=self.rook_env.namespace,
                                             label_selector="rook_cluster={0}".format(
                                                 self.rook_env.namespace))
-        self.nodes = KubernetesResource(self.coreV1_api.list_node)
+        self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
 
-    def rook_url(self, path):
+    def rook_url(self, path: str) -> str:
         prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
             self.rook_env.crd_version, self.rook_env.namespace)
         return urljoin(prefix, path)
 
-    def rook_api_call(self, verb, path, **kwargs):
+    def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
         full_path = self.rook_url(path)
         log.debug("[%s] %s" % (verb, full_path))
 
@@ -218,22 +225,22 @@ class RookCluster(object):
             _preload_content=True,
             **kwargs)
 
-    def rook_api_get(self, path, **kwargs):
+    def rook_api_get(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("GET", path, **kwargs)
 
-    def rook_api_delete(self, path):
+    def rook_api_delete(self, path: str) -> Any:
         return self.rook_api_call("DELETE", path)
 
-    def rook_api_patch(self, path, **kwargs):
+    def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("PATCH", path,
                                   header_params={"Content-Type": "application/json-patch+json"},
                                   **kwargs)
 
-    def rook_api_post(self, path, **kwargs):
+    def rook_api_post(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("POST", path, **kwargs)
 
-    def get_discovered_devices(self, nodenames=None):
-        def predicate(item):
+    def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]:
+        def predicate(item: client.V1ConfigMapList) -> bool:
             if nodenames is not None:
                 return item.metadata.labels['rook.io/node'] in nodenames
             else:
@@ -252,7 +259,7 @@ class RookCluster(object):
 
         return nodename_to_devices
 
-    def get_nfs_conf_url(self, nfs_cluster, instance):
+    def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
         #
         # Fetch cephnfs object for "nfs_cluster" and then return a rados://
         # URL for the instance within that cluster. If the fetch fails, just
@@ -273,7 +280,10 @@ class RookCluster(object):
             url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
         return url
 
-    def describe_pods(self, service_type, service_id, nodename):
+    def describe_pods(self,
+                      service_type: Optional[str],
+                      service_id: Optional[str],
+                      nodename: Optional[str]) -> List[Dict[str, Any]]:
         """
         Go query the k8s API about deployment, containers related to this
         filesystem
@@ -358,9 +368,8 @@ class RookCluster(object):
 
         return pods_summary
 
-    def remove_pods(self, names):
+    def remove_pods(self, names: List[str]) -> List[str]:
         pods = [i for i in self.rook_pods.items]
-        num = 0
         for p in pods:
             d = p.to_dict()
             daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
@@ -372,14 +381,13 @@ class RookCluster(object):
                     self.rook_env.namespace,
                     body=client.V1DeleteOptions()
                 )
-                num += 1
-        return "Removed %d pods" % num
+        return [f'Removed Pod {n}' for n in names]
 
-    def get_node_names(self):
+    def get_node_names(self) -> List[str]:
         return [i.metadata.name for i in self.nodes.items]
 
     @contextmanager
-    def ignore_409(self, what):
+    def ignore_409(self, what: str) -> Iterator[None]:
         try:
             yield
         except ApiException as e:
@@ -389,7 +397,7 @@ class RookCluster(object):
             else:
                 raise
 
-    def apply_filesystem(self, spec: ServiceSpec) -> None:
+    def apply_filesystem(self, spec: ServiceSpec) -> str:
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
@@ -411,15 +419,17 @@ class RookCluster(object):
                     )
                 )
             )
+        assert spec.service_id is not None
         return self._create_or_patch(
             cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
             _update_fs, _create_fs)
 
-    def apply_objectstore(self, spec):
+    def apply_objectstore(self, spec: RGWSpec) -> str:
 
         # FIXME: service_id is $realm.$zone, but rook uses realm
         # $crname and zone $crname.  The '.'  will confuse kubernetes.
         # For now, assert that realm==zone.
+        assert spec.service_id is not None
         (realm, zone) = spec.service_id.split('.', 1)
         assert realm == zone
         assert spec.subcluster is None
@@ -456,7 +466,7 @@ class RookCluster(object):
             cos.CephObjectStore, 'cephobjectstores', name,
             _update_zone, _create_zone)
 
-    def apply_nfsgw(self, spec: NFSServiceSpec) -> None:
+    def apply_nfsgw(self, spec: NFSServiceSpec) -> str:
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
@@ -489,10 +499,11 @@ class RookCluster(object):
 
             return rook_nfsgw
 
+        assert spec.service_id is not None
         return self._create_or_patch(cnfs.CephNFS, 'cephnfses', spec.service_id,
                 _update_nfs, _create_nfs)
 
-    def rm_service(self, rooktype, service_id):
+    def rm_service(self, rooktype: str, service_id: str) -> str:
 
         objpath = "{0}/{1}".format(rooktype, service_id)
 
@@ -505,7 +516,9 @@ class RookCluster(object):
             else:
                 raise
 
-    def can_create_osd(self):
+        return f'Removed {objpath}'
+
+    def can_create_osd(self) -> bool:
         current_cluster = self.rook_api_get(
             "cephclusters/{0}".format(self.rook_env.cluster_name))
         use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
@@ -514,12 +527,14 @@ class RookCluster(object):
         # to anything we put in 'nodes', so can't do OSD creation.
         return not use_all_nodes
 
-    def node_exists(self, node_name):
+    def node_exists(self, node_name: str) -> bool:
         return node_name in self.get_node_names()
 
-    def update_mon_count(self, newcount):
+    def update_mon_count(self, newcount: Optional[int]) -> str:
         def _update_mon_count(current, new):
             # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            if newcount is None:
+                raise orchestrator.OrchestratorError('unable to set mon count to None')
             new.spec.mon.count = newcount
             return new
         return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
@@ -588,7 +603,7 @@ class RookCluster(object):
 
         return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
 
-    def _patch(self, crd, crd_name, cr_name, func):
+    def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
         current_json = self.rook_api_get(
             "{}/{}".format(crd_name, cr_name)
         )
@@ -616,7 +631,12 @@ class RookCluster(object):
 
         return "Success"
 
-    def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func):
+    def _create_or_patch(self,
+                         crd: Type,
+                         crd_name: str,
+                         cr_name: str,
+                         update_func: Callable[[CrdClassT], CrdClassT],
+                         create_func: Callable[[], CrdClassT]) -> str:
         try:
             current_json = self.rook_api_get(
                 "{}/{}".format(crd_name, cr_name)