]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: Add missing type annotations 42594/head
authorSebastian Wagner <sewagner@redhat.com>
Tue, 3 Aug 2021 12:38:32 +0000 (14:38 +0200)
committerSebastian Wagner <sewagner@redhat.com>
Tue, 3 Aug 2021 12:38:32 +0000 (14:38 +0200)
Turnd out `command_maintenance` missing an if-branch that returned
None instead of a str

Signed-off-by: Sebastian Wagner <sewagner@redhat.com>
src/cephadm/cephadm

index ea9c316425f6fb2b53f6cb6625354245e98a3733..3d5d462f75252769186369e53a40634f9221f291 100755 (executable)
@@ -31,7 +31,7 @@ from contextlib import redirect_stdout
 import ssl
 from enum import Enum
 
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast
 
 import re
 import uuid
@@ -45,6 +45,8 @@ from urllib.error import HTTPError
 from urllib.request import urlopen
 from pathlib import Path
 
+FuncT = TypeVar('FuncT', bound=Callable)
+
 # Default container images -----------------------------------------------------
 DEFAULT_IMAGE = 'quay.ceph.io/ceph-ci/ceph:master'
 DEFAULT_IMAGE_IS_MASTER = True
@@ -104,7 +106,7 @@ cached_stdin = None
 
 class BaseConfig:
 
-    def __init__(self):
+    def __init__(self) -> None:
         self.image: str = ''
         self.docker: bool = False
         self.data_dir: str = DATA_DIR
@@ -123,7 +125,7 @@ class BaseConfig:
         self.container_init: bool = CONTAINER_INIT
         self.container_engine: Optional[ContainerEngine] = None
 
-    def set_from_args(self, args: argparse.Namespace):
+    def set_from_args(self, args: argparse.Namespace) -> None:
         argdict: Dict[str, Any] = vars(args)
         for k, v in argdict.items():
             if hasattr(self, k):
@@ -132,7 +134,7 @@ class BaseConfig:
 
 class CephadmContext:
 
-    def __init__(self):
+    def __init__(self) -> None:
         self.__dict__['_args'] = None
         self.__dict__['_conf'] = BaseConfig()
 
@@ -164,7 +166,7 @@ class CephadmContext:
 
 
 class ContainerEngine:
-    def __init__(self):
+    def __init__(self) -> None:
         self.path = find_program(self.EXE)
 
     @property
@@ -175,17 +177,17 @@ class ContainerEngine:
 class Podman(ContainerEngine):
     EXE = 'podman'
 
-    def __init__(self):
+    def __init__(self) -> None:
         super().__init__()
-        self._version = None
+        self._version: Optional[Tuple[int, ...]] = None
 
     @property
-    def version(self):
+    def version(self) -> Tuple[int, ...]:
         if self._version is None:
             raise RuntimeError('Please call `get_version` first')
         return self._version
 
-    def get_version(self, ctx: CephadmContext):
+    def get_version(self, ctx: CephadmContext) -> None:
         out, _, _ = call_throws(ctx, [self.path, 'version', '--format', '{{.Client.Version}}'])
         self._version = _parse_podman_version(out)
 
@@ -703,7 +705,7 @@ class HAproxy(object):
             cname = '%s-%s' % (cname, desc)
         return cname
 
-    def extract_uid_gid_haproxy(self):
+    def extract_uid_gid_haproxy(self) -> Tuple[int, int]:
         # better directory for this?
         return extract_uid_gid(self.ctx, file_path='/var/lib')
 
@@ -806,7 +808,7 @@ class Keepalived(object):
             'net.ipv4.ip_nonlocal_bind = 1',
         ]
 
-    def extract_uid_gid_keepalived(self):
+    def extract_uid_gid_keepalived(self) -> Tuple[int, int]:
         # better directory for this?
         return extract_uid_gid(self.ctx, file_path='/var/lib')
 
@@ -1071,34 +1073,34 @@ class Timeout(TimeoutError):
     seconds.
     """
 
-    def __init__(self, lock_file):
+    def __init__(self, lock_file: str) -> None:
         """
         """
         #: The path of the file lock.
         self.lock_file = lock_file
         return None
 
-    def __str__(self):
+    def __str__(self) -> str:
         temp = "The file lock '{}' could not be acquired."\
                .format(self.lock_file)
         return temp
 
 
 class _Acquire_ReturnProxy(object):
-    def __init__(self, lock):
+    def __init__(self, lock: 'FileLock') -> None:
         self.lock = lock
         return None
 
-    def __enter__(self):
+    def __enter__(self) -> 'FileLock':
         return self.lock
 
-    def __exit__(self, exc_type, exc_value, traceback):
+    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
         self.lock.release()
         return None
 
 
 class FileLock(object):
-    def __init__(self, ctx: CephadmContext, name, timeout=-1):
+    def __init__(self, ctx: CephadmContext, name: str, timeout: int = -1) -> None:
         if not os.path.exists(LOCK_DIR):
             os.mkdir(LOCK_DIR, 0o700)
         self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
@@ -1117,10 +1119,10 @@ class FileLock(object):
         return None
 
     @property
-    def is_locked(self):
+    def is_locked(self) -> bool:
         return self._lock_file_fd is not None
 
-    def acquire(self, timeout=None, poll_intervall=0.05):
+    def acquire(self, timeout: Optional[int] = None, poll_intervall: float = 0.05) -> _Acquire_ReturnProxy:
         """
         Acquires the file lock or fails with a :exc:`Timeout` error.
         .. code-block:: python
@@ -1187,7 +1189,7 @@ class FileLock(object):
             raise
         return _Acquire_ReturnProxy(lock=self)
 
-    def release(self, force=False):
+    def release(self, force: bool = False) -> None:
         """
         Releases the file lock.
         Please note, that the lock is only completly released, if the lock
@@ -1211,19 +1213,19 @@ class FileLock(object):
 
         return None
 
-    def __enter__(self):
+    def __enter__(self) -> 'FileLock':
         self.acquire()
         return self
 
-    def __exit__(self, exc_type, exc_value, traceback):
+    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
         self.release()
         return None
 
-    def __del__(self):
+    def __del__(self) -> None:
         self.release(force=True)
         return None
 
-    def _acquire(self):
+    def _acquire(self) -> None:
         open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
         fd = os.open(self._lock_file, open_mode)
 
@@ -1235,7 +1237,7 @@ class FileLock(object):
             self._lock_file_fd = fd
         return None
 
-    def _release(self):
+    def _release(self) -> None:
         # Do not remove the lockfile:
         #
         #   https://github.com/benediktschmitt/py-filelock/issues/31
@@ -1277,7 +1279,7 @@ if sys.version_info < (3, 8):
         on amount of spawn processes.
         """
 
-        def __init__(self):
+        def __init__(self) -> None:
             self._pid_counter = itertools.count(0)
             self._threads = {}
 
@@ -1385,7 +1387,7 @@ def call(ctx: CephadmContext,
          desc: Optional[str] = None,
          verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
          timeout: Optional[int] = DEFAULT_TIMEOUT,
-         **kwargs) -> Tuple[str, str, int]:
+         **kwargs: Any) -> Tuple[str, str, int]:
     """
     Wrap subprocess.Popen to
 
@@ -1448,7 +1450,7 @@ def call_throws(
         desc: Optional[str] = None,
         verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
         timeout: Optional[int] = DEFAULT_TIMEOUT,
-        **kwargs) -> Tuple[str, str, int]:
+        **kwargs: Any) -> Tuple[str, str, int]:
     out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs)
     if ret:
         raise RuntimeError('Failed command: %s' % ' '.join(command))
@@ -1474,7 +1476,7 @@ def call_timeout(ctx, command, timeout):
 ##################################
 
 
-def json_loads_retry(cli_func):
+def json_loads_retry(cli_func: Callable[[], str]) -> Any:
     for sleep_secs in [1, 4, 4]:
         try:
             return json.loads(cli_func())
@@ -1579,7 +1581,7 @@ def try_convert_datetime(s):
 
 def _parse_podman_version(version_str):
     # type: (str) -> Tuple[int, ...]
-    def to_int(val, org_e=None):
+    def to_int(val: str, org_e: Optional[Exception] = None) -> int:
         if not val and org_e:
             raise org_e
         try:
@@ -1643,23 +1645,23 @@ def is_fsid(s):
     return True
 
 
-def validate_fsid(func):
+def validate_fsid(func: FuncT) -> FuncT:
     @wraps(func)
-    def _validate_fsid(ctx: CephadmContext):
+    def _validate_fsid(ctx: CephadmContext) -> Any:
         if 'fsid' in ctx and ctx.fsid:
             if not is_fsid(ctx.fsid):
                 raise Error('not an fsid: %s' % ctx.fsid)
         return func(ctx)
-    return _validate_fsid
+    return cast(FuncT, _validate_fsid)
 
 
-def infer_fsid(func):
+def infer_fsid(func: FuncT) -> FuncT:
     """
     If we only find a single fsid in /var/lib/ceph/*, use that
     """
     @infer_config
     @wraps(func)
-    def _infer_fsid(ctx: CephadmContext):
+    def _infer_fsid(ctx: CephadmContext) -> Any:
         if 'fsid' in ctx and ctx.fsid:
             logger.debug('Using specified fsid: %s' % ctx.fsid)
             return func(ctx)
@@ -1693,15 +1695,15 @@ def infer_fsid(func):
             raise Error('Cannot infer an fsid, one must be specified: %s' % fsids)
         return func(ctx)
 
-    return _infer_fsid
+    return cast(FuncT, _infer_fsid)
 
 
-def infer_config(func):
+def infer_config(func: FuncT) -> FuncT:
     """
     If we find a MON daemon, use the config from that container
     """
     @wraps(func)
-    def _infer_config(ctx: CephadmContext):
+    def _infer_config(ctx: CephadmContext) -> Any:
         ctx.config = ctx.config if 'config' in ctx else None
         if ctx.config:
             logger.debug('Using specified config: %s' % ctx.config)
@@ -1723,10 +1725,10 @@ def infer_config(func):
             ctx.config = SHELL_DEFAULT_CONF
         return func(ctx)
 
-    return _infer_config
+    return cast(FuncT, _infer_config)
 
 
-def _get_default_image(ctx: CephadmContext):
+def _get_default_image(ctx: CephadmContext) -> str:
     if DEFAULT_IMAGE_IS_MASTER:
         warn = """This is a development version of cephadm.
 For information regarding the latest stable release:
@@ -1737,12 +1739,12 @@ For information regarding the latest stable release:
     return DEFAULT_IMAGE
 
 
-def infer_image(func):
+def infer_image(func: FuncT) -> FuncT:
     """
     Use the most recent ceph image
     """
     @wraps(func)
-    def _infer_image(ctx: CephadmContext):
+    def _infer_image(ctx: CephadmContext) -> Any:
         if not ctx.image:
             ctx.image = os.environ.get('CEPHADM_IMAGE')
         if not ctx.image:
@@ -1751,12 +1753,12 @@ def infer_image(func):
             ctx.image = _get_default_image(ctx)
         return func(ctx)
 
-    return _infer_image
+    return cast(FuncT, _infer_image)
 
 
-def default_image(func):
+def default_image(func: FuncT) -> FuncT:
     @wraps(func)
-    def _default_image(ctx: CephadmContext):
+    def _default_image(ctx: CephadmContext) -> Any:
         if not ctx.image:
             if 'name' in ctx and ctx.name:
                 type_ = ctx.name.split('.', 1)[0]
@@ -1773,10 +1775,10 @@ def default_image(func):
 
         return func(ctx)
 
-    return _default_image
+    return cast(FuncT, _default_image)
 
 
-def get_last_local_ceph_image(ctx: CephadmContext, container_path: str):
+def get_last_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]:
     """
     :return: The most recent local ceph image (already pulled)
     """
@@ -1935,7 +1937,7 @@ def move_files(ctx, src, dst, uid=None, gid=None):
 
 
 # copied from distutils
-def find_executable(executable, path=None):
+def find_executable(executable: str, path: Optional[str] = None) -> Optional[str]:
     """Tries to find 'executable' in the directories listed in 'path'.
     A string listing directories separated by 'os.pathsep'; defaults to
     os.environ['PATH'].  Returns the complete filename or None if not found.
@@ -1979,7 +1981,7 @@ def find_program(filename):
     return name
 
 
-def find_container_engine(ctx: CephadmContext):
+def find_container_engine(ctx: CephadmContext) -> Optional[ContainerEngine]:
     if ctx.docker:
         return Docker()
     else:
@@ -2013,7 +2015,7 @@ def get_unit_name(fsid, daemon_type, daemon_id=None):
         return 'ceph-%s@%s' % (fsid, daemon_type)
 
 
-def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid, name):
+def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid: str, name: str) -> str:
     daemon = get_daemon_description(ctx, fsid, name)
     try:
         return daemon['systemd_unit']
@@ -2116,7 +2118,7 @@ def get_legacy_daemon_fsid(ctx, cluster,
     return fsid
 
 
-def should_log_to_journald(ctx):
+def should_log_to_journald(ctx: CephadmContext) -> bool:
     if ctx.log_to_journald is not None:
         return ctx.log_to_journald
     return isinstance(ctx.container_engine, Podman) and \
@@ -2667,6 +2669,7 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
             else:
                 config_js = get_parm(ctx.config_json)
             assert isinstance(config_js, dict)
+            assert isinstance(daemon_id, str)
 
             cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port)
             cephadm_exporter.deploy_daemon_unit(config_js)
@@ -2726,7 +2729,7 @@ def _write_container_cmd_to_bash(ctx, file_obj, container, comment=None, backgro
         + (' &' if background else '') + '\n')
 
 
-def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str):
+def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str) -> None:
     # systemd may fail to cleanup cgroups from previous stopped unit, which will cause next "systemctl start" to fail.
     # see https://tracker.ceph.com/issues/50998
 
@@ -2740,7 +2743,7 @@ def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str):
     if not cg_path.exists():
         return
 
-    def cg_trim(path: Path):
+    def cg_trim(path: Path) -> None:
         for p in path.iterdir():
             if p.is_dir():
                 cg_trim(p)
@@ -3322,7 +3325,7 @@ class CephContainer:
         return ret
 
     def stop_cmd(self):
-        # type () -> List[str]
+        # type: () -> List[str]
         ret = [
             str(self.ctx.container_engine.path),
             'stop', self.cname,
@@ -3407,7 +3410,7 @@ def command_inspect_image(ctx):
     return 0
 
 
-def normalize_image_digest(digest):
+def normalize_image_digest(digest: str) -> str:
     # normal case:
     #   ceph/ceph -> docker.io/ceph/ceph
     # edge cases that shouldn't ever come up:
@@ -3677,7 +3680,7 @@ def prepare_create_mon(
     fsid: str, mon_id: str,
     bootstrap_keyring_path: str,
     monmap_path: str
-):
+) -> Tuple[str, str]:
     logger.info('Creating mon...')
     create_daemon_dirs(ctx, fsid, 'mon', mon_id, uid, gid)
     mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', mon_id)
@@ -3720,7 +3723,7 @@ def wait_for_mon(
     ctx: CephadmContext,
     mon_id: str, mon_dir: str,
     admin_keyring_path: str, config_path: str
-):
+) -> None:
     logger.info('Waiting for mon to start...')
     c = CephContainer(
         ctx,
@@ -4189,7 +4192,7 @@ def command_bootstrap(ctx):
                 {tmp.name: '/var/lib/ceph/user.conf:z'})
 
     # wait for mgr to restart (after enabling a module)
-    def wait_for_mgr_restart():
+    def wait_for_mgr_restart() -> None:
         # first get latest mgrmap epoch from the mon.  try newer 'mgr
         # stat' command first, then fall back to 'mgr dump' if
         # necessary
@@ -4296,7 +4299,7 @@ def command_bootstrap(ctx):
 ##################################
 
 
-def command_registry_login(ctx: CephadmContext):
+def command_registry_login(ctx: CephadmContext) -> int:
     if ctx.registry_json:
         logger.info('Pulling custom registry login info from %s.' % ctx.registry_json)
         d = get_parm(ctx.registry_json)
@@ -4322,7 +4325,7 @@ def command_registry_login(ctx: CephadmContext):
     return 0
 
 
-def registry_login(ctx: CephadmContext, url, username, password):
+def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[str], password: Optional[str]) -> None:
     logger.info('Logging into custom registry.')
     try:
         engine = ctx.container_engine
@@ -4734,7 +4737,7 @@ def list_networks(ctx):
     return res
 
 
-def _list_ipv4_networks(ctx: CephadmContext):
+def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]:
     execstr: Optional[str] = find_executable('ip')
     if not execstr:
         raise FileNotFoundError("unable to find 'ip' command")
@@ -4742,7 +4745,7 @@ def _list_ipv4_networks(ctx: CephadmContext):
     return _parse_ipv4_route(out)
 
 
-def _parse_ipv4_route(out):
+def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, List[str]]]:
     r = {}  # type: Dict[str,Dict[str,List[str]]]
     p = re.compile(r'^(\S+) dev (\S+) (.*)scope link (.*)src (\S+)')
     for line in out.splitlines():
@@ -4760,7 +4763,7 @@ def _parse_ipv4_route(out):
     return r
 
 
-def _list_ipv6_networks(ctx: CephadmContext):
+def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]:
     execstr: Optional[str] = find_executable('ip')
     if not execstr:
         raise FileNotFoundError("unable to find 'ip' command")
@@ -4769,7 +4772,7 @@ def _list_ipv6_networks(ctx: CephadmContext):
     return _parse_ipv6_route(routes, ips)
 
 
-def _parse_ipv6_route(routes, ips):
+def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, List[str]]]:
     r = {}  # type: Dict[str,Dict[str,List[str]]]
     route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
     ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
@@ -5503,7 +5506,7 @@ def command_rm_daemon(ctx):
 ##################################
 
 
-def _zap(ctx, what):
+def _zap(ctx: CephadmContext, what: str) -> None:
     mounts = get_container_mounts(ctx, ctx.fsid, 'clusterless-ceph-volume', None)
     c = CephContainer(
         ctx,
@@ -5519,7 +5522,7 @@ def _zap(ctx, what):
 
 
 @infer_image
-def _zap_osds(ctx):
+def _zap_osds(ctx: CephadmContext) -> None:
     # assume fsid lock already held
 
     # list
@@ -5552,7 +5555,7 @@ def _zap_osds(ctx):
             logger.warning(f'Not zapping LVs (not implemented): {lv_names}')
 
 
-def command_zap_osds(ctx):
+def command_zap_osds(ctx: CephadmContext) -> None:
     if not ctx.force:
         raise Error('must pass --force to proceed: '
                     'this command may destroy precious data!')
@@ -5744,7 +5747,7 @@ def command_prepare_host(ctx: CephadmContext) -> None:
 
 class CustomValidation(argparse.Action):
 
-    def _check_name(self, values):
+    def _check_name(self, values: str) -> None:
         try:
             (daemon_type, daemon_id) = values.split('.', 1)
         except ValueError:
@@ -5757,7 +5760,9 @@ class CustomValidation(argparse.Action):
                                          'name must declare the type of daemon e.g. '
                                          '{}'.format(', '.join(daemons)))
 
-    def __call__(self, parser, namespace, values, option_string=None):
+    def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Union[str, Sequence[Any], None],
+                 option_string: Optional[str] = None) -> None:
+        assert isinstance(values, str)
         if self.dest == 'name':
             self._check_name(values)
             setattr(namespace, self.dest, values)
@@ -5799,7 +5804,8 @@ def get_distro():
 
 class Packager(object):
     def __init__(self, ctx: CephadmContext,
-                 stable=None, version=None, branch=None, commit=None):
+                 stable: Optional[str] = None, version: Optional[str] = None,
+                 branch: Optional[str] = None, commit: Optional[str] = None):
         assert \
             (stable and not version and not branch and not commit) or \
             (not stable and version and not branch and not commit) or \
@@ -5811,13 +5817,19 @@ class Packager(object):
         self.branch = branch
         self.commit = commit
 
-    def add_repo(self):
+    def add_repo(self) -> None:
+        raise NotImplementedError
+
+    def rm_repo(self) -> None:
+        raise NotImplementedError
+
+    def install(self, ls: List[str]) -> None:
         raise NotImplementedError
 
-    def rm_repo(self):
+    def install_podman(self) -> None:
         raise NotImplementedError
 
-    def query_shaman(self, distro, distro_version, branch, commit):
+    def query_shaman(self, distro: str, distro_version: Any, branch: Optional[str], commit: Optional[str]) -> str:
         # query shaman
         logger.info('Fetching repo metadata from shaman and chacra...')
         shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format(
@@ -5841,7 +5853,7 @@ class Packager(object):
             raise Error('%s, failed to fetch %s' % (err, chacra_url))
         return chacra_response.read().decode('utf-8')
 
-    def repo_gpgkey(self):
+    def repo_gpgkey(self) -> Tuple[str, str]:
         if self.ctx.gpg_url:
             return self.ctx.gpg_url
         if self.stable or self.version:
@@ -5849,7 +5861,7 @@ class Packager(object):
         else:
             return 'https://download.ceph.com/keys/autobuild.gpg', 'autobuild'
 
-    def enable_service(self, service):
+    def enable_service(self, service: str) -> None:
         """
         Start and enable the service (typically using systemd).
         """
@@ -5863,19 +5875,20 @@ class Apt(Packager):
     }
 
     def __init__(self, ctx: CephadmContext,
-                 stable, version, branch, commit,
-                 distro, distro_version, distro_codename):
+                 stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
+                 distro: Optional[str], distro_version: Optional[str], distro_codename: Optional[str]) -> None:
         super(Apt, self).__init__(ctx, stable=stable, version=version,
                                   branch=branch, commit=commit)
+        assert distro
         self.ctx = ctx
         self.distro = self.DISTRO_NAMES[distro]
         self.distro_codename = distro_codename
         self.distro_version = distro_version
 
-    def repo_path(self):
+    def repo_path(self) -> str:
         return '/etc/apt/sources.list.d/ceph.list'
 
-    def add_repo(self):
+    def add_repo(self) -> None:
 
         url, name = self.repo_gpgkey()
         logger.info('Installing repo GPG key from %s...' % url)
@@ -5905,7 +5918,7 @@ class Apt(Packager):
 
         self.update()
 
-    def rm_repo(self):
+    def rm_repo(self) -> None:
         for name in ['autobuild', 'release']:
             p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name
             if os.path.exists(p):
@@ -5918,15 +5931,15 @@ class Apt(Packager):
         if self.distro == 'ubuntu':
             self.rm_kubic_repo()
 
-    def install(self, ls):
+    def install(self, ls: List[str]) -> None:
         logger.info('Installing packages %s...' % ls)
         call_throws(self.ctx, ['apt-get', 'install', '-y'] + ls)
 
-    def update(self):
+    def update(self) -> None:
         logger.info('Updating package list...')
         call_throws(self.ctx, ['apt-get', 'update'])
 
-    def install_podman(self):
+    def install_podman(self) -> None:
         if self.distro == 'ubuntu':
             logger.info('Setting up repo for podman...')
             self.add_kubic_repo()
@@ -5939,20 +5952,20 @@ class Apt(Packager):
             logger.info('Podman did not work.  Falling back to docker...')
             self.install(['docker.io'])
 
-    def kubic_repo_url(self):
+    def kubic_repo_url(self) -> str:
         return 'https://download.opensuse.org/repositories/devel:/kubic:/' \
                'libcontainers:/stable/xUbuntu_%s/' % self.distro_version
 
-    def kubic_repo_path(self):
+    def kubic_repo_path(self) -> str:
         return '/etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list'
 
-    def kubric_repo_gpgkey_url(self):
+    def kubric_repo_gpgkey_url(self) -> str:
         return '%s/Release.key' % self.kubic_repo_url()
 
-    def kubric_repo_gpgkey_path(self):
+    def kubric_repo_gpgkey_path(self) -> str:
         return '/etc/apt/trusted.gpg.d/kubic.release.gpg'
 
-    def add_kubic_repo(self):
+    def add_kubic_repo(self) -> None:
         url = self.kubric_repo_gpgkey_url()
         logger.info('Installing repo GPG key from %s...' % url)
         try:
@@ -5971,7 +5984,7 @@ class Apt(Packager):
         with open(self.kubic_repo_path(), 'w') as f:
             f.write(content)
 
-    def rm_kubic_repo(self):
+    def rm_kubic_repo(self) -> None:
         keyring = self.kubric_repo_gpgkey_path()
         if os.path.exists(keyring):
             logger.info('Removing repo GPG key %s...' % keyring)
@@ -5993,10 +6006,12 @@ class YumDnf(Packager):
     }
 
     def __init__(self, ctx: CephadmContext,
-                 stable, version, branch, commit,
-                 distro, distro_version):
+                 stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
+                 distro: Optional[str], distro_version: Optional[str]) -> None:
         super(YumDnf, self).__init__(ctx, stable=stable, version=version,
                                      branch=branch, commit=commit)
+        assert distro
+        assert distro_version
         self.ctx = ctx
         self.major = int(distro_version.split('.')[0])
         self.distro_normalized = self.DISTRO_NAMES[distro][0]
@@ -6007,7 +6022,7 @@ class YumDnf(Packager):
         else:
             self.tool = 'yum'
 
-    def custom_repo(self, **kw):
+    def custom_repo(self, **kw: Any) -> str:
         """
         Repo files need special care in that a whole line should not be present
         if there is no value for it. Because we were using `format()` we could
@@ -6063,10 +6078,10 @@ class YumDnf(Packager):
 
         return '\n'.join(lines)
 
-    def repo_path(self):
+    def repo_path(self) -> str:
         return '/etc/yum.repos.d/ceph.repo'
 
-    def repo_baseurl(self):
+    def repo_baseurl(self) -> str:
         assert self.stable or self.version
         if self.version:
             return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.version,
@@ -6075,7 +6090,7 @@ class YumDnf(Packager):
             return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.stable,
                                      self.distro_code)
 
-    def add_repo(self):
+    def add_repo(self) -> None:
         if self.distro_code.startswith('fc'):
             raise Error('Ceph team does not build Fedora specific packages and therefore cannot add repos for this distro')
         if self.distro_code == 'el7':
@@ -6111,15 +6126,15 @@ class YumDnf(Packager):
             logger.info('Enabling EPEL...')
             call_throws(self.ctx, [self.tool, 'install', '-y', 'epel-release'])
 
-    def rm_repo(self):
+    def rm_repo(self) -> None:
         if os.path.exists(self.repo_path()):
             os.unlink(self.repo_path())
 
-    def install(self, ls):
+    def install(self, ls: List[str]) -> None:
         logger.info('Installing packages %s...' % ls)
         call_throws(self.ctx, [self.tool, 'install', '-y'] + ls)
 
-    def install_podman(self):
+    def install_podman(self) -> None:
         self.install(['podman'])
 
 
@@ -6131,10 +6146,11 @@ class Zypper(Packager):
     ]
 
     def __init__(self, ctx: CephadmContext,
-                 stable, version, branch, commit,
-                 distro, distro_version):
+                 stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
+                 distro: Optional[str], distro_version: Optional[str]) -> None:
         super(Zypper, self).__init__(ctx, stable=stable, version=version,
                                      branch=branch, commit=commit)
+        assert distro is not None
         self.ctx = ctx
         self.tool = 'zypper'
         self.distro = 'opensuse'
@@ -6142,7 +6158,7 @@ class Zypper(Packager):
         if 'tumbleweed' not in distro and distro_version is not None:
             self.distro_version = distro_version
 
-    def custom_repo(self, **kw):
+    def custom_repo(self, **kw: Any) -> str:
         """
         See YumDnf for format explanation.
         """
@@ -6171,10 +6187,10 @@ class Zypper(Packager):
 
         return '\n'.join(lines)
 
-    def repo_path(self):
+    def repo_path(self) -> str:
         return '/etc/zypp/repos.d/ceph.repo'
 
-    def repo_baseurl(self):
+    def repo_baseurl(self) -> str:
         assert self.stable or self.version
         if self.version:
             return '%s/rpm-%s/%s' % (self.ctx.repo_url,
@@ -6183,7 +6199,7 @@ class Zypper(Packager):
             return '%s/rpm-%s/%s' % (self.ctx.repo_url,
                                      self.stable, self.distro)
 
-    def add_repo(self):
+    def add_repo(self) -> None:
         if self.stable or self.version:
             content = ''
             for n, t in {
@@ -6208,20 +6224,21 @@ class Zypper(Packager):
         with open(self.repo_path(), 'w') as f:
             f.write(content)
 
-    def rm_repo(self):
+    def rm_repo(self) -> None:
         if os.path.exists(self.repo_path()):
             os.unlink(self.repo_path())
 
-    def install(self, ls):
+    def install(self, ls: List[str]) -> None:
         logger.info('Installing packages %s...' % ls)
         call_throws(self.ctx, [self.tool, 'in', '-y'] + ls)
 
-    def install_podman(self):
+    def install_podman(self) -> None:
         self.install(['podman'])
 
 
 def create_packager(ctx: CephadmContext,
-                    stable=None, version=None, branch=None, commit=None):
+                    stable: Optional[str] = None, version: Optional[str] = None,
+                    branch: Optional[str] = None, commit: Optional[str] = None) -> Packager:
     distro, distro_version, distro_codename = get_distro()
     if distro in YumDnf.DISTRO_NAMES:
         return YumDnf(ctx, stable=stable, version=version,
@@ -6239,7 +6256,7 @@ def create_packager(ctx: CephadmContext,
     raise Error('Distro %s version %s not supported' % (distro, distro_version))
 
 
-def command_add_repo(ctx: CephadmContext):
+def command_add_repo(ctx: CephadmContext) -> None:
     if ctx.version and ctx.release:
         raise Error('you can specify either --release or --version but not both')
     if not ctx.version and not ctx.release and not ctx.dev and not ctx.dev_commit:
@@ -6261,12 +6278,12 @@ def command_add_repo(ctx: CephadmContext):
     logger.info('Completed adding repo.')
 
 
-def command_rm_repo(ctx: CephadmContext):
+def command_rm_repo(ctx: CephadmContext) -> None:
     pkg = create_packager(ctx)
     pkg.rm_repo()
 
 
-def command_install(ctx: CephadmContext):
+def command_install(ctx: CephadmContext) -> None:
     pkg = create_packager(ctx)
     pkg.install(ctx.packages)
 
@@ -6275,7 +6292,7 @@ def command_install(ctx: CephadmContext):
 
 def get_ipv4_address(ifname):
     # type: (str) -> str
-    def _extract(sock, offset):
+    def _extract(sock: socket.socket, offset: int) -> str:
         return socket.inet_ntop(
             socket.AF_INET,
             fcntl.ioctl(
@@ -6804,7 +6821,7 @@ class HostFacts():
         }
 
     @property
-    def selinux_enabled(self):
+    def selinux_enabled(self) -> bool:
         return (self.kernel_security['type'] == 'SELinux') and \
                (self.kernel_security['description'] != 'SELinux: Disabled')
 
@@ -6838,7 +6855,7 @@ class HostFacts():
 ##################################
 
 
-def command_gather_facts(ctx: CephadmContext):
+def command_gather_facts(ctx: CephadmContext) -> None:
     """gather_facts is intended to provide host releated metadata to the caller"""
     host = HostFacts(ctx)
     print(host.dump())
@@ -6850,7 +6867,7 @@ def command_gather_facts(ctx: CephadmContext):
 class CephadmCache:
     task_types = ['disks', 'daemons', 'host', 'http_server']
 
-    def __init__(self):
+    def __init__(self) -> None:
         self.started_epoch_secs = time.time()
         self.tasks = {
             'daemons': 'inactive',
@@ -6858,21 +6875,21 @@ class CephadmCache:
             'host': 'inactive',
             'http_server': 'inactive',
         }
-        self.errors = []
-        self.disks = {}
-        self.daemons = {}
-        self.host = {}
+        self.errors: list = []
+        self.disks: dict = {}
+        self.daemons: dict = {}
+        self.host: dict = {}
         self.lock = RLock()
 
     @property
-    def health(self):
+    def health(self) -> dict:
         return {
             'started_epoch_secs': self.started_epoch_secs,
             'tasks': self.tasks,
             'errors': self.errors,
         }
 
-    def to_json(self):
+    def to_json(self) -> dict:
         return {
             'health': self.health,
             'host': self.host,
@@ -6880,14 +6897,14 @@ class CephadmCache:
             'disks': self.disks,
         }
 
-    def update_health(self, task_type, task_status, error_msg=None):
+    def update_health(self, task_type: str, task_status: str, error_msg: Optional[str] = None) -> None:
         assert task_type in CephadmCache.task_types
         with self.lock:
             self.tasks[task_type] = task_status
             if error_msg:
                 self.errors.append(error_msg)
 
-    def update_task(self, task_type, content):
+    def update_task(self, task_type: str, content: dict) -> None:
         assert task_type in CephadmCache.task_types
         assert isinstance(content, dict)
         with self.lock:
@@ -6918,14 +6935,14 @@ class CephadmDaemonHandler(BaseHTTPRequestHandler):
 
     class Decorators:
         @classmethod
-        def authorize(cls, f):
+        def authorize(cls, f: Any) -> Any:
             """Implement a basic token check.
 
             The token is installed at deployment time and must be provided to
             ensure we only respond to callers who know our token i.e. mgr
             """
 
-            def wrapper(self, *args, **kwargs):
+            def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
                 auth = self.headers.get('Authorization', None)
                 if auth != 'Bearer ' + self.server.token:
                     self.send_error(401)
@@ -6934,7 +6951,7 @@ class CephadmDaemonHandler(BaseHTTPRequestHandler):
 
             return wrapper
 
-    def _help_page(self):
+    def _help_page(self) -> str:
         return """<!DOCTYPE html>
 <html>
 <head><title>cephadm metadata exporter</title></head>
@@ -6970,14 +6987,14 @@ td,th {{
 </body>
 </html>""".format(api_version=CephadmDaemonHandler.api_version)
 
-    def _fetch_root(self):
+    def _fetch_root(self) -> None:
         self.send_response(200)
         self.send_header('Content-type', 'text/html; charset=utf-8')
         self.end_headers()
         self.wfile.write(self._help_page().encode('utf-8'))
 
     @Decorators.authorize
-    def do_GET(self):
+    def do_GET(self) -> None:
         """Handle *all* GET requests"""
 
         if self.path == '/':
@@ -7035,7 +7052,7 @@ td,th {{
             self.end_headers()
             self.wfile.write(json.dumps({'message': bad_request_msg}).encode('utf-8'))
 
-    def log_message(self, format, *args):
+    def log_message(self, format: str, *args: Any) -> None:
         rqst = ' '.join(str(a) for a in args)
         logger.info(f'client:{self.address_string()} [{self.log_date_time_string()}] {rqst}')
 
@@ -7055,7 +7072,7 @@ class CephadmDaemon():
     loop_delay = 1
     thread_check_interval = 5
 
-    def __init__(self, ctx: CephadmContext, fsid, daemon_id=None, port=None):
+    def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Optional[str] = None, port: Optional[int] = None) -> None:
         self.ctx = ctx
         self.fsid = fsid
         self.daemon_id = daemon_id
@@ -7071,7 +7088,7 @@ class CephadmDaemon():
         self.token = read_file([os.path.join(self.daemon_path, CephadmDaemon.token_name)])
 
     @classmethod
-    def validate_config(cls, config):
+    def validate_config(cls, config: dict) -> None:
         reqs = ', '.join(CephadmDaemon.config_requirements)
         errors = []
 
@@ -7104,11 +7121,11 @@ class CephadmDaemon():
             raise Error('Parameter errors : {}'.format(', '.join(errors)))
 
     @property
-    def port_active(self):
+    def port_active(self) -> bool:
         return port_in_use(self.ctx, self.port)
 
     @property
-    def can_run(self):
+    def can_run(self) -> bool:
         # if port is in use
         if self.port_active:
             self.errors.append(f'TCP port {self.port} already in use, unable to bind')
@@ -7121,15 +7138,16 @@ class CephadmDaemon():
         return len(self.errors) == 0
 
     @staticmethod
-    def _unit_name(fsid, daemon_id):
+    def _unit_name(fsid: str, daemon_id: str) -> str:
         return '{}.service'.format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id))
 
     @property
-    def unit_name(self):
+    def unit_name(self) -> str:
+        assert self.daemon_id is not None
         return CephadmDaemon._unit_name(self.fsid, self.daemon_id)
 
     @property
-    def daemon_path(self):
+    def daemon_path(self) -> str:
         return os.path.join(
             self.ctx.data_dir,
             self.fsid,
@@ -7137,12 +7155,12 @@ class CephadmDaemon():
         )
 
     @property
-    def binary_path(self):
+    def binary_path(self) -> str:
         path = os.path.realpath(__file__)
         assert os.path.isfile(path)
         return path
 
-    def _handle_thread_exception(self, exc, thread_type):
+    def _handle_thread_exception(self, exc: Exception, thread_type: str) -> None:
         e_msg = f'{exc.__class__.__name__} exception: {str(exc)}'
         thread_info = getattr(self.cephadm_cache, thread_type)
         errors = thread_info.get('scrape_errors', [])
@@ -7157,7 +7175,7 @@ class CephadmDaemon():
             }
         )
 
-    def _scrape_host_facts(self, refresh_interval=10):
+    def _scrape_host_facts(self, refresh_interval: int = 10) -> None:
         ctr = 0
         exception_encountered = False
 
@@ -7200,7 +7218,7 @@ class CephadmDaemon():
             ctr += CephadmDaemon.loop_delay
         logger.info('host-facts thread stopped')
 
-    def _scrape_ceph_volume(self, refresh_interval=15):
+    def _scrape_ceph_volume(self, refresh_interval: int = 15) -> None:
         # we're invoking the ceph_volume command, so we need to set the args that it
         # expects to use
         self.ctx.command = 'inventory --format=json'.split()
@@ -7259,7 +7277,7 @@ class CephadmDaemon():
 
         logger.info('ceph-volume thread stopped')
 
-    def _scrape_list_daemons(self, refresh_interval=20):
+    def _scrape_list_daemons(self, refresh_interval: int = 20) -> None:
         ctr = 0
         exception_encountered = False
         while True:
@@ -7299,7 +7317,7 @@ class CephadmDaemon():
             ctr += CephadmDaemon.loop_delay
         logger.info('list-daemons thread stopped')
 
-    def _create_thread(self, target, name, refresh_interval=None):
+    def _create_thread(self, target: Any, name: str, refresh_interval: Optional[int] = None) -> Thread:
         if refresh_interval:
             t = Thread(target=target, args=(refresh_interval,))
         else:
@@ -7316,7 +7334,7 @@ class CephadmDaemon():
             logger.info(f'{start_msg}')
         return t
 
-    def reload(self, *args):
+    def reload(self, *args: Any) -> None:
         """reload -HUP received
 
         This is a placeholder function only, and serves to provide the hook that could
@@ -7324,12 +7342,12 @@ class CephadmDaemon():
         """
         logger.info('Reload request received - ignoring, no action needed')
 
-    def shutdown(self, *args):
+    def shutdown(self, *args: Any) -> None:
         logger.info('Shutdown request received')
         self.stop = True
         self.http_server.shutdown()
 
-    def run(self):
+    def run(self) -> None:
         logger.info(f"cephadm exporter starting for FSID '{self.fsid}'")
         if not self.can_run:
             logger.error('Unable to start the exporter daemon')
@@ -7384,7 +7402,7 @@ class CephadmDaemon():
         logger.info('Main http server thread stopped')
 
     @property
-    def unit_run(self):
+    def unit_run(self) -> str:
 
         return """set -e
 {py3} {bin_path} exporter --fsid {fsid} --id {daemon_id} --port {port} &""".format(
@@ -7396,7 +7414,7 @@ class CephadmDaemon():
         )
 
     @property
-    def unit_file(self):
+    def unit_file(self) -> str:
         docker = isinstance(self.ctx.container_engine, Docker)
         return """#generated by cephadm
 [Unit]
@@ -7423,7 +7441,7 @@ WantedBy=ceph-{fsid}.target
            docker_after=' docker.service' if docker else '',
            docker_requires='Requires=docker.service\n' if docker else '')
 
-    def deploy_daemon_unit(self, config=None):
+    def deploy_daemon_unit(self, config: Optional[dict] = None) -> None:
         """deploy a specific unit file for cephadm
 
         The normal deploy_daemon_units doesn't apply for this
@@ -7470,7 +7488,7 @@ WantedBy=ceph-{fsid}.target
         call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name])
 
     @classmethod
-    def uninstall(cls, ctx: CephadmContext, fsid, daemon_type, daemon_id):
+    def uninstall(cls, ctx: CephadmContext, fsid: str, daemon_type: str, daemon_id: str) -> None:
         unit_name = CephadmDaemon._unit_name(fsid, daemon_id)
         unit_path = os.path.join(ctx.unit_dir, unit_name)
         unit_run = os.path.join(ctx.data_dir, fsid, f'{daemon_type}.{daemon_id}', 'unit.run')
@@ -7508,7 +7526,7 @@ WantedBy=ceph-{fsid}.target
             stdout, stderr, rc = call(ctx, ['systemctl', 'daemon-reload'])
 
 
-def command_exporter(ctx: CephadmContext):
+def command_exporter(ctx: CephadmContext) -> None:
     exporter = CephadmDaemon(ctx, ctx.fsid, daemon_id=ctx.id, port=ctx.port)
 
     if ctx.fsid not in os.listdir(ctx.data_dir):
@@ -7531,7 +7549,7 @@ def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool:
 
 
 @infer_fsid
-def command_maintenance(ctx: CephadmContext):
+def command_maintenance(ctx: CephadmContext) -> str:
     if not ctx.fsid:
         raise Error('must pass --fsid to specify cluster')
 
@@ -7580,6 +7598,7 @@ def command_maintenance(ctx: CephadmContext):
                     return 'failed - unable to start the target'
                 else:
                     return f'success - systemd target {target} enabled and started'
+        return f'success - systemd target {target} enabled and started'
 
 ##################################
 
@@ -8223,7 +8242,7 @@ def _get_parser():
     return parser
 
 
-def _parse_args(av):
+def _parse_args(av: List[str]) -> argparse.Namespace:
     parser = _get_parser()
 
     args = parser.parse_args(av)
@@ -8280,7 +8299,7 @@ def cephadm_init(args: List[str]) -> CephadmContext:
     return ctx
 
 
-def main():
+def main() -> None:
 
     # root?
     if os.geteuid() != 0: