From 0c78d239a1933338fea14e21fde2638a418bc660 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Tue, 3 Aug 2021 14:38:32 +0200 Subject: [PATCH] cephadm: Add missing type annotations Turnd out `command_maintenance` missing an if-branch that returned None instead of a str Signed-off-by: Sebastian Wagner --- src/cephadm/cephadm | 325 +++++++++++++++++++++++--------------------- 1 file changed, 172 insertions(+), 153 deletions(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index ea9c316425f6f..3d5d462f75252 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -31,7 +31,7 @@ from contextlib import redirect_stdout import ssl from enum import Enum -from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO +from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast import re import uuid @@ -45,6 +45,8 @@ from urllib.error import HTTPError from urllib.request import urlopen from pathlib import Path +FuncT = TypeVar('FuncT', bound=Callable) + # Default container images ----------------------------------------------------- DEFAULT_IMAGE = 'quay.ceph.io/ceph-ci/ceph:master' DEFAULT_IMAGE_IS_MASTER = True @@ -104,7 +106,7 @@ cached_stdin = None class BaseConfig: - def __init__(self): + def __init__(self) -> None: self.image: str = '' self.docker: bool = False self.data_dir: str = DATA_DIR @@ -123,7 +125,7 @@ class BaseConfig: self.container_init: bool = CONTAINER_INIT self.container_engine: Optional[ContainerEngine] = None - def set_from_args(self, args: argparse.Namespace): + def set_from_args(self, args: argparse.Namespace) -> None: argdict: Dict[str, Any] = vars(args) for k, v in argdict.items(): if hasattr(self, k): @@ -132,7 +134,7 @@ class BaseConfig: class CephadmContext: - def __init__(self): + def __init__(self) -> None: self.__dict__['_args'] = None self.__dict__['_conf'] = BaseConfig() @@ -164,7 +166,7 @@ class CephadmContext: class ContainerEngine: - def __init__(self): + def __init__(self) -> None: self.path = find_program(self.EXE) @property @@ -175,17 +177,17 @@ class ContainerEngine: class Podman(ContainerEngine): EXE = 'podman' - def __init__(self): + def __init__(self) -> None: super().__init__() - self._version = None + self._version: Optional[Tuple[int, ...]] = None @property - def version(self): + def version(self) -> Tuple[int, ...]: if self._version is None: raise RuntimeError('Please call `get_version` first') return self._version - def get_version(self, ctx: CephadmContext): + def get_version(self, ctx: CephadmContext) -> None: out, _, _ = call_throws(ctx, [self.path, 'version', '--format', '{{.Client.Version}}']) self._version = _parse_podman_version(out) @@ -703,7 +705,7 @@ class HAproxy(object): cname = '%s-%s' % (cname, desc) return cname - def extract_uid_gid_haproxy(self): + def extract_uid_gid_haproxy(self) -> Tuple[int, int]: # better directory for this? return extract_uid_gid(self.ctx, file_path='/var/lib') @@ -806,7 +808,7 @@ class Keepalived(object): 'net.ipv4.ip_nonlocal_bind = 1', ] - def extract_uid_gid_keepalived(self): + def extract_uid_gid_keepalived(self) -> Tuple[int, int]: # better directory for this? return extract_uid_gid(self.ctx, file_path='/var/lib') @@ -1071,34 +1073,34 @@ class Timeout(TimeoutError): seconds. """ - def __init__(self, lock_file): + def __init__(self, lock_file: str) -> None: """ """ #: The path of the file lock. self.lock_file = lock_file return None - def __str__(self): + def __str__(self) -> str: temp = "The file lock '{}' could not be acquired."\ .format(self.lock_file) return temp class _Acquire_ReturnProxy(object): - def __init__(self, lock): + def __init__(self, lock: 'FileLock') -> None: self.lock = lock return None - def __enter__(self): + def __enter__(self) -> 'FileLock': return self.lock - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: self.lock.release() return None class FileLock(object): - def __init__(self, ctx: CephadmContext, name, timeout=-1): + def __init__(self, ctx: CephadmContext, name: str, timeout: int = -1) -> None: if not os.path.exists(LOCK_DIR): os.mkdir(LOCK_DIR, 0o700) self._lock_file = os.path.join(LOCK_DIR, name + '.lock') @@ -1117,10 +1119,10 @@ class FileLock(object): return None @property - def is_locked(self): + def is_locked(self) -> bool: return self._lock_file_fd is not None - def acquire(self, timeout=None, poll_intervall=0.05): + def acquire(self, timeout: Optional[int] = None, poll_intervall: float = 0.05) -> _Acquire_ReturnProxy: """ Acquires the file lock or fails with a :exc:`Timeout` error. .. code-block:: python @@ -1187,7 +1189,7 @@ class FileLock(object): raise return _Acquire_ReturnProxy(lock=self) - def release(self, force=False): + def release(self, force: bool = False) -> None: """ Releases the file lock. Please note, that the lock is only completly released, if the lock @@ -1211,19 +1213,19 @@ class FileLock(object): return None - def __enter__(self): + def __enter__(self) -> 'FileLock': self.acquire() return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: self.release() return None - def __del__(self): + def __del__(self) -> None: self.release(force=True) return None - def _acquire(self): + def _acquire(self) -> None: open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC fd = os.open(self._lock_file, open_mode) @@ -1235,7 +1237,7 @@ class FileLock(object): self._lock_file_fd = fd return None - def _release(self): + def _release(self) -> None: # Do not remove the lockfile: # # https://github.com/benediktschmitt/py-filelock/issues/31 @@ -1277,7 +1279,7 @@ if sys.version_info < (3, 8): on amount of spawn processes. """ - def __init__(self): + def __init__(self) -> None: self._pid_counter = itertools.count(0) self._threads = {} @@ -1385,7 +1387,7 @@ def call(ctx: CephadmContext, desc: Optional[str] = None, verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs) -> Tuple[str, str, int]: + **kwargs: Any) -> Tuple[str, str, int]: """ Wrap subprocess.Popen to @@ -1448,7 +1450,7 @@ def call_throws( desc: Optional[str] = None, verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs) -> Tuple[str, str, int]: + **kwargs: Any) -> Tuple[str, str, int]: out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs) if ret: raise RuntimeError('Failed command: %s' % ' '.join(command)) @@ -1474,7 +1476,7 @@ def call_timeout(ctx, command, timeout): ################################## -def json_loads_retry(cli_func): +def json_loads_retry(cli_func: Callable[[], str]) -> Any: for sleep_secs in [1, 4, 4]: try: return json.loads(cli_func()) @@ -1579,7 +1581,7 @@ def try_convert_datetime(s): def _parse_podman_version(version_str): # type: (str) -> Tuple[int, ...] - def to_int(val, org_e=None): + def to_int(val: str, org_e: Optional[Exception] = None) -> int: if not val and org_e: raise org_e try: @@ -1643,23 +1645,23 @@ def is_fsid(s): return True -def validate_fsid(func): +def validate_fsid(func: FuncT) -> FuncT: @wraps(func) - def _validate_fsid(ctx: CephadmContext): + def _validate_fsid(ctx: CephadmContext) -> Any: if 'fsid' in ctx and ctx.fsid: if not is_fsid(ctx.fsid): raise Error('not an fsid: %s' % ctx.fsid) return func(ctx) - return _validate_fsid + return cast(FuncT, _validate_fsid) -def infer_fsid(func): +def infer_fsid(func: FuncT) -> FuncT: """ If we only find a single fsid in /var/lib/ceph/*, use that """ @infer_config @wraps(func) - def _infer_fsid(ctx: CephadmContext): + def _infer_fsid(ctx: CephadmContext) -> Any: if 'fsid' in ctx and ctx.fsid: logger.debug('Using specified fsid: %s' % ctx.fsid) return func(ctx) @@ -1693,15 +1695,15 @@ def infer_fsid(func): raise Error('Cannot infer an fsid, one must be specified: %s' % fsids) return func(ctx) - return _infer_fsid + return cast(FuncT, _infer_fsid) -def infer_config(func): +def infer_config(func: FuncT) -> FuncT: """ If we find a MON daemon, use the config from that container """ @wraps(func) - def _infer_config(ctx: CephadmContext): + def _infer_config(ctx: CephadmContext) -> Any: ctx.config = ctx.config if 'config' in ctx else None if ctx.config: logger.debug('Using specified config: %s' % ctx.config) @@ -1723,10 +1725,10 @@ def infer_config(func): ctx.config = SHELL_DEFAULT_CONF return func(ctx) - return _infer_config + return cast(FuncT, _infer_config) -def _get_default_image(ctx: CephadmContext): +def _get_default_image(ctx: CephadmContext) -> str: if DEFAULT_IMAGE_IS_MASTER: warn = """This is a development version of cephadm. For information regarding the latest stable release: @@ -1737,12 +1739,12 @@ For information regarding the latest stable release: return DEFAULT_IMAGE -def infer_image(func): +def infer_image(func: FuncT) -> FuncT: """ Use the most recent ceph image """ @wraps(func) - def _infer_image(ctx: CephadmContext): + def _infer_image(ctx: CephadmContext) -> Any: if not ctx.image: ctx.image = os.environ.get('CEPHADM_IMAGE') if not ctx.image: @@ -1751,12 +1753,12 @@ def infer_image(func): ctx.image = _get_default_image(ctx) return func(ctx) - return _infer_image + return cast(FuncT, _infer_image) -def default_image(func): +def default_image(func: FuncT) -> FuncT: @wraps(func) - def _default_image(ctx: CephadmContext): + def _default_image(ctx: CephadmContext) -> Any: if not ctx.image: if 'name' in ctx and ctx.name: type_ = ctx.name.split('.', 1)[0] @@ -1773,10 +1775,10 @@ def default_image(func): return func(ctx) - return _default_image + return cast(FuncT, _default_image) -def get_last_local_ceph_image(ctx: CephadmContext, container_path: str): +def get_last_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]: """ :return: The most recent local ceph image (already pulled) """ @@ -1935,7 +1937,7 @@ def move_files(ctx, src, dst, uid=None, gid=None): # copied from distutils -def find_executable(executable, path=None): +def find_executable(executable: str, path: Optional[str] = None) -> Optional[str]: """Tries to find 'executable' in the directories listed in 'path'. A string listing directories separated by 'os.pathsep'; defaults to os.environ['PATH']. Returns the complete filename or None if not found. @@ -1979,7 +1981,7 @@ def find_program(filename): return name -def find_container_engine(ctx: CephadmContext): +def find_container_engine(ctx: CephadmContext) -> Optional[ContainerEngine]: if ctx.docker: return Docker() else: @@ -2013,7 +2015,7 @@ def get_unit_name(fsid, daemon_type, daemon_id=None): return 'ceph-%s@%s' % (fsid, daemon_type) -def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid, name): +def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid: str, name: str) -> str: daemon = get_daemon_description(ctx, fsid, name) try: return daemon['systemd_unit'] @@ -2116,7 +2118,7 @@ def get_legacy_daemon_fsid(ctx, cluster, return fsid -def should_log_to_journald(ctx): +def should_log_to_journald(ctx: CephadmContext) -> bool: if ctx.log_to_journald is not None: return ctx.log_to_journald return isinstance(ctx.container_engine, Podman) and \ @@ -2667,6 +2669,7 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, else: config_js = get_parm(ctx.config_json) assert isinstance(config_js, dict) + assert isinstance(daemon_id, str) cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port) cephadm_exporter.deploy_daemon_unit(config_js) @@ -2726,7 +2729,7 @@ def _write_container_cmd_to_bash(ctx, file_obj, container, comment=None, backgro + (' &' if background else '') + '\n') -def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str): +def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str) -> None: # systemd may fail to cleanup cgroups from previous stopped unit, which will cause next "systemctl start" to fail. # see https://tracker.ceph.com/issues/50998 @@ -2740,7 +2743,7 @@ def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str): if not cg_path.exists(): return - def cg_trim(path: Path): + def cg_trim(path: Path) -> None: for p in path.iterdir(): if p.is_dir(): cg_trim(p) @@ -3322,7 +3325,7 @@ class CephContainer: return ret def stop_cmd(self): - # type () -> List[str] + # type: () -> List[str] ret = [ str(self.ctx.container_engine.path), 'stop', self.cname, @@ -3407,7 +3410,7 @@ def command_inspect_image(ctx): return 0 -def normalize_image_digest(digest): +def normalize_image_digest(digest: str) -> str: # normal case: # ceph/ceph -> docker.io/ceph/ceph # edge cases that shouldn't ever come up: @@ -3677,7 +3680,7 @@ def prepare_create_mon( fsid: str, mon_id: str, bootstrap_keyring_path: str, monmap_path: str -): +) -> Tuple[str, str]: logger.info('Creating mon...') create_daemon_dirs(ctx, fsid, 'mon', mon_id, uid, gid) mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', mon_id) @@ -3720,7 +3723,7 @@ def wait_for_mon( ctx: CephadmContext, mon_id: str, mon_dir: str, admin_keyring_path: str, config_path: str -): +) -> None: logger.info('Waiting for mon to start...') c = CephContainer( ctx, @@ -4189,7 +4192,7 @@ def command_bootstrap(ctx): {tmp.name: '/var/lib/ceph/user.conf:z'}) # wait for mgr to restart (after enabling a module) - def wait_for_mgr_restart(): + def wait_for_mgr_restart() -> None: # first get latest mgrmap epoch from the mon. try newer 'mgr # stat' command first, then fall back to 'mgr dump' if # necessary @@ -4296,7 +4299,7 @@ def command_bootstrap(ctx): ################################## -def command_registry_login(ctx: CephadmContext): +def command_registry_login(ctx: CephadmContext) -> int: if ctx.registry_json: logger.info('Pulling custom registry login info from %s.' % ctx.registry_json) d = get_parm(ctx.registry_json) @@ -4322,7 +4325,7 @@ def command_registry_login(ctx: CephadmContext): return 0 -def registry_login(ctx: CephadmContext, url, username, password): +def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[str], password: Optional[str]) -> None: logger.info('Logging into custom registry.') try: engine = ctx.container_engine @@ -4734,7 +4737,7 @@ def list_networks(ctx): return res -def _list_ipv4_networks(ctx: CephadmContext): +def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]: execstr: Optional[str] = find_executable('ip') if not execstr: raise FileNotFoundError("unable to find 'ip' command") @@ -4742,7 +4745,7 @@ def _list_ipv4_networks(ctx: CephadmContext): return _parse_ipv4_route(out) -def _parse_ipv4_route(out): +def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, List[str]]]: r = {} # type: Dict[str,Dict[str,List[str]]] p = re.compile(r'^(\S+) dev (\S+) (.*)scope link (.*)src (\S+)') for line in out.splitlines(): @@ -4760,7 +4763,7 @@ def _parse_ipv4_route(out): return r -def _list_ipv6_networks(ctx: CephadmContext): +def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]: execstr: Optional[str] = find_executable('ip') if not execstr: raise FileNotFoundError("unable to find 'ip' command") @@ -4769,7 +4772,7 @@ def _list_ipv6_networks(ctx: CephadmContext): return _parse_ipv6_route(routes, ips) -def _parse_ipv6_route(routes, ips): +def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, List[str]]]: r = {} # type: Dict[str,Dict[str,List[str]]] route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$') ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$') @@ -5503,7 +5506,7 @@ def command_rm_daemon(ctx): ################################## -def _zap(ctx, what): +def _zap(ctx: CephadmContext, what: str) -> None: mounts = get_container_mounts(ctx, ctx.fsid, 'clusterless-ceph-volume', None) c = CephContainer( ctx, @@ -5519,7 +5522,7 @@ def _zap(ctx, what): @infer_image -def _zap_osds(ctx): +def _zap_osds(ctx: CephadmContext) -> None: # assume fsid lock already held # list @@ -5552,7 +5555,7 @@ def _zap_osds(ctx): logger.warning(f'Not zapping LVs (not implemented): {lv_names}') -def command_zap_osds(ctx): +def command_zap_osds(ctx: CephadmContext) -> None: if not ctx.force: raise Error('must pass --force to proceed: ' 'this command may destroy precious data!') @@ -5744,7 +5747,7 @@ def command_prepare_host(ctx: CephadmContext) -> None: class CustomValidation(argparse.Action): - def _check_name(self, values): + def _check_name(self, values: str) -> None: try: (daemon_type, daemon_id) = values.split('.', 1) except ValueError: @@ -5757,7 +5760,9 @@ class CustomValidation(argparse.Action): 'name must declare the type of daemon e.g. ' '{}'.format(', '.join(daemons))) - def __call__(self, parser, namespace, values, option_string=None): + def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Union[str, Sequence[Any], None], + option_string: Optional[str] = None) -> None: + assert isinstance(values, str) if self.dest == 'name': self._check_name(values) setattr(namespace, self.dest, values) @@ -5799,7 +5804,8 @@ def get_distro(): class Packager(object): def __init__(self, ctx: CephadmContext, - stable=None, version=None, branch=None, commit=None): + stable: Optional[str] = None, version: Optional[str] = None, + branch: Optional[str] = None, commit: Optional[str] = None): assert \ (stable and not version and not branch and not commit) or \ (not stable and version and not branch and not commit) or \ @@ -5811,13 +5817,19 @@ class Packager(object): self.branch = branch self.commit = commit - def add_repo(self): + def add_repo(self) -> None: + raise NotImplementedError + + def rm_repo(self) -> None: + raise NotImplementedError + + def install(self, ls: List[str]) -> None: raise NotImplementedError - def rm_repo(self): + def install_podman(self) -> None: raise NotImplementedError - def query_shaman(self, distro, distro_version, branch, commit): + def query_shaman(self, distro: str, distro_version: Any, branch: Optional[str], commit: Optional[str]) -> str: # query shaman logger.info('Fetching repo metadata from shaman and chacra...') shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format( @@ -5841,7 +5853,7 @@ class Packager(object): raise Error('%s, failed to fetch %s' % (err, chacra_url)) return chacra_response.read().decode('utf-8') - def repo_gpgkey(self): + def repo_gpgkey(self) -> Tuple[str, str]: if self.ctx.gpg_url: return self.ctx.gpg_url if self.stable or self.version: @@ -5849,7 +5861,7 @@ class Packager(object): else: return 'https://download.ceph.com/keys/autobuild.gpg', 'autobuild' - def enable_service(self, service): + def enable_service(self, service: str) -> None: """ Start and enable the service (typically using systemd). """ @@ -5863,19 +5875,20 @@ class Apt(Packager): } def __init__(self, ctx: CephadmContext, - stable, version, branch, commit, - distro, distro_version, distro_codename): + stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str], + distro: Optional[str], distro_version: Optional[str], distro_codename: Optional[str]) -> None: super(Apt, self).__init__(ctx, stable=stable, version=version, branch=branch, commit=commit) + assert distro self.ctx = ctx self.distro = self.DISTRO_NAMES[distro] self.distro_codename = distro_codename self.distro_version = distro_version - def repo_path(self): + def repo_path(self) -> str: return '/etc/apt/sources.list.d/ceph.list' - def add_repo(self): + def add_repo(self) -> None: url, name = self.repo_gpgkey() logger.info('Installing repo GPG key from %s...' % url) @@ -5905,7 +5918,7 @@ class Apt(Packager): self.update() - def rm_repo(self): + def rm_repo(self) -> None: for name in ['autobuild', 'release']: p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name if os.path.exists(p): @@ -5918,15 +5931,15 @@ class Apt(Packager): if self.distro == 'ubuntu': self.rm_kubic_repo() - def install(self, ls): + def install(self, ls: List[str]) -> None: logger.info('Installing packages %s...' % ls) call_throws(self.ctx, ['apt-get', 'install', '-y'] + ls) - def update(self): + def update(self) -> None: logger.info('Updating package list...') call_throws(self.ctx, ['apt-get', 'update']) - def install_podman(self): + def install_podman(self) -> None: if self.distro == 'ubuntu': logger.info('Setting up repo for podman...') self.add_kubic_repo() @@ -5939,20 +5952,20 @@ class Apt(Packager): logger.info('Podman did not work. Falling back to docker...') self.install(['docker.io']) - def kubic_repo_url(self): + def kubic_repo_url(self) -> str: return 'https://download.opensuse.org/repositories/devel:/kubic:/' \ 'libcontainers:/stable/xUbuntu_%s/' % self.distro_version - def kubic_repo_path(self): + def kubic_repo_path(self) -> str: return '/etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list' - def kubric_repo_gpgkey_url(self): + def kubric_repo_gpgkey_url(self) -> str: return '%s/Release.key' % self.kubic_repo_url() - def kubric_repo_gpgkey_path(self): + def kubric_repo_gpgkey_path(self) -> str: return '/etc/apt/trusted.gpg.d/kubic.release.gpg' - def add_kubic_repo(self): + def add_kubic_repo(self) -> None: url = self.kubric_repo_gpgkey_url() logger.info('Installing repo GPG key from %s...' % url) try: @@ -5971,7 +5984,7 @@ class Apt(Packager): with open(self.kubic_repo_path(), 'w') as f: f.write(content) - def rm_kubic_repo(self): + def rm_kubic_repo(self) -> None: keyring = self.kubric_repo_gpgkey_path() if os.path.exists(keyring): logger.info('Removing repo GPG key %s...' % keyring) @@ -5993,10 +6006,12 @@ class YumDnf(Packager): } def __init__(self, ctx: CephadmContext, - stable, version, branch, commit, - distro, distro_version): + stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str], + distro: Optional[str], distro_version: Optional[str]) -> None: super(YumDnf, self).__init__(ctx, stable=stable, version=version, branch=branch, commit=commit) + assert distro + assert distro_version self.ctx = ctx self.major = int(distro_version.split('.')[0]) self.distro_normalized = self.DISTRO_NAMES[distro][0] @@ -6007,7 +6022,7 @@ class YumDnf(Packager): else: self.tool = 'yum' - def custom_repo(self, **kw): + def custom_repo(self, **kw: Any) -> str: """ Repo files need special care in that a whole line should not be present if there is no value for it. Because we were using `format()` we could @@ -6063,10 +6078,10 @@ class YumDnf(Packager): return '\n'.join(lines) - def repo_path(self): + def repo_path(self) -> str: return '/etc/yum.repos.d/ceph.repo' - def repo_baseurl(self): + def repo_baseurl(self) -> str: assert self.stable or self.version if self.version: return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.version, @@ -6075,7 +6090,7 @@ class YumDnf(Packager): return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.stable, self.distro_code) - def add_repo(self): + def add_repo(self) -> None: if self.distro_code.startswith('fc'): raise Error('Ceph team does not build Fedora specific packages and therefore cannot add repos for this distro') if self.distro_code == 'el7': @@ -6111,15 +6126,15 @@ class YumDnf(Packager): logger.info('Enabling EPEL...') call_throws(self.ctx, [self.tool, 'install', '-y', 'epel-release']) - def rm_repo(self): + def rm_repo(self) -> None: if os.path.exists(self.repo_path()): os.unlink(self.repo_path()) - def install(self, ls): + def install(self, ls: List[str]) -> None: logger.info('Installing packages %s...' % ls) call_throws(self.ctx, [self.tool, 'install', '-y'] + ls) - def install_podman(self): + def install_podman(self) -> None: self.install(['podman']) @@ -6131,10 +6146,11 @@ class Zypper(Packager): ] def __init__(self, ctx: CephadmContext, - stable, version, branch, commit, - distro, distro_version): + stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str], + distro: Optional[str], distro_version: Optional[str]) -> None: super(Zypper, self).__init__(ctx, stable=stable, version=version, branch=branch, commit=commit) + assert distro is not None self.ctx = ctx self.tool = 'zypper' self.distro = 'opensuse' @@ -6142,7 +6158,7 @@ class Zypper(Packager): if 'tumbleweed' not in distro and distro_version is not None: self.distro_version = distro_version - def custom_repo(self, **kw): + def custom_repo(self, **kw: Any) -> str: """ See YumDnf for format explanation. """ @@ -6171,10 +6187,10 @@ class Zypper(Packager): return '\n'.join(lines) - def repo_path(self): + def repo_path(self) -> str: return '/etc/zypp/repos.d/ceph.repo' - def repo_baseurl(self): + def repo_baseurl(self) -> str: assert self.stable or self.version if self.version: return '%s/rpm-%s/%s' % (self.ctx.repo_url, @@ -6183,7 +6199,7 @@ class Zypper(Packager): return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.stable, self.distro) - def add_repo(self): + def add_repo(self) -> None: if self.stable or self.version: content = '' for n, t in { @@ -6208,20 +6224,21 @@ class Zypper(Packager): with open(self.repo_path(), 'w') as f: f.write(content) - def rm_repo(self): + def rm_repo(self) -> None: if os.path.exists(self.repo_path()): os.unlink(self.repo_path()) - def install(self, ls): + def install(self, ls: List[str]) -> None: logger.info('Installing packages %s...' % ls) call_throws(self.ctx, [self.tool, 'in', '-y'] + ls) - def install_podman(self): + def install_podman(self) -> None: self.install(['podman']) def create_packager(ctx: CephadmContext, - stable=None, version=None, branch=None, commit=None): + stable: Optional[str] = None, version: Optional[str] = None, + branch: Optional[str] = None, commit: Optional[str] = None) -> Packager: distro, distro_version, distro_codename = get_distro() if distro in YumDnf.DISTRO_NAMES: return YumDnf(ctx, stable=stable, version=version, @@ -6239,7 +6256,7 @@ def create_packager(ctx: CephadmContext, raise Error('Distro %s version %s not supported' % (distro, distro_version)) -def command_add_repo(ctx: CephadmContext): +def command_add_repo(ctx: CephadmContext) -> None: if ctx.version and ctx.release: raise Error('you can specify either --release or --version but not both') if not ctx.version and not ctx.release and not ctx.dev and not ctx.dev_commit: @@ -6261,12 +6278,12 @@ def command_add_repo(ctx: CephadmContext): logger.info('Completed adding repo.') -def command_rm_repo(ctx: CephadmContext): +def command_rm_repo(ctx: CephadmContext) -> None: pkg = create_packager(ctx) pkg.rm_repo() -def command_install(ctx: CephadmContext): +def command_install(ctx: CephadmContext) -> None: pkg = create_packager(ctx) pkg.install(ctx.packages) @@ -6275,7 +6292,7 @@ def command_install(ctx: CephadmContext): def get_ipv4_address(ifname): # type: (str) -> str - def _extract(sock, offset): + def _extract(sock: socket.socket, offset: int) -> str: return socket.inet_ntop( socket.AF_INET, fcntl.ioctl( @@ -6804,7 +6821,7 @@ class HostFacts(): } @property - def selinux_enabled(self): + def selinux_enabled(self) -> bool: return (self.kernel_security['type'] == 'SELinux') and \ (self.kernel_security['description'] != 'SELinux: Disabled') @@ -6838,7 +6855,7 @@ class HostFacts(): ################################## -def command_gather_facts(ctx: CephadmContext): +def command_gather_facts(ctx: CephadmContext) -> None: """gather_facts is intended to provide host releated metadata to the caller""" host = HostFacts(ctx) print(host.dump()) @@ -6850,7 +6867,7 @@ def command_gather_facts(ctx: CephadmContext): class CephadmCache: task_types = ['disks', 'daemons', 'host', 'http_server'] - def __init__(self): + def __init__(self) -> None: self.started_epoch_secs = time.time() self.tasks = { 'daemons': 'inactive', @@ -6858,21 +6875,21 @@ class CephadmCache: 'host': 'inactive', 'http_server': 'inactive', } - self.errors = [] - self.disks = {} - self.daemons = {} - self.host = {} + self.errors: list = [] + self.disks: dict = {} + self.daemons: dict = {} + self.host: dict = {} self.lock = RLock() @property - def health(self): + def health(self) -> dict: return { 'started_epoch_secs': self.started_epoch_secs, 'tasks': self.tasks, 'errors': self.errors, } - def to_json(self): + def to_json(self) -> dict: return { 'health': self.health, 'host': self.host, @@ -6880,14 +6897,14 @@ class CephadmCache: 'disks': self.disks, } - def update_health(self, task_type, task_status, error_msg=None): + def update_health(self, task_type: str, task_status: str, error_msg: Optional[str] = None) -> None: assert task_type in CephadmCache.task_types with self.lock: self.tasks[task_type] = task_status if error_msg: self.errors.append(error_msg) - def update_task(self, task_type, content): + def update_task(self, task_type: str, content: dict) -> None: assert task_type in CephadmCache.task_types assert isinstance(content, dict) with self.lock: @@ -6918,14 +6935,14 @@ class CephadmDaemonHandler(BaseHTTPRequestHandler): class Decorators: @classmethod - def authorize(cls, f): + def authorize(cls, f: Any) -> Any: """Implement a basic token check. The token is installed at deployment time and must be provided to ensure we only respond to callers who know our token i.e. mgr """ - def wrapper(self, *args, **kwargs): + def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: auth = self.headers.get('Authorization', None) if auth != 'Bearer ' + self.server.token: self.send_error(401) @@ -6934,7 +6951,7 @@ class CephadmDaemonHandler(BaseHTTPRequestHandler): return wrapper - def _help_page(self): + def _help_page(self) -> str: return """ cephadm metadata exporter @@ -6970,14 +6987,14 @@ td,th {{ """.format(api_version=CephadmDaemonHandler.api_version) - def _fetch_root(self): + def _fetch_root(self) -> None: self.send_response(200) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() self.wfile.write(self._help_page().encode('utf-8')) @Decorators.authorize - def do_GET(self): + def do_GET(self) -> None: """Handle *all* GET requests""" if self.path == '/': @@ -7035,7 +7052,7 @@ td,th {{ self.end_headers() self.wfile.write(json.dumps({'message': bad_request_msg}).encode('utf-8')) - def log_message(self, format, *args): + def log_message(self, format: str, *args: Any) -> None: rqst = ' '.join(str(a) for a in args) logger.info(f'client:{self.address_string()} [{self.log_date_time_string()}] {rqst}') @@ -7055,7 +7072,7 @@ class CephadmDaemon(): loop_delay = 1 thread_check_interval = 5 - def __init__(self, ctx: CephadmContext, fsid, daemon_id=None, port=None): + def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Optional[str] = None, port: Optional[int] = None) -> None: self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id @@ -7071,7 +7088,7 @@ class CephadmDaemon(): self.token = read_file([os.path.join(self.daemon_path, CephadmDaemon.token_name)]) @classmethod - def validate_config(cls, config): + def validate_config(cls, config: dict) -> None: reqs = ', '.join(CephadmDaemon.config_requirements) errors = [] @@ -7104,11 +7121,11 @@ class CephadmDaemon(): raise Error('Parameter errors : {}'.format(', '.join(errors))) @property - def port_active(self): + def port_active(self) -> bool: return port_in_use(self.ctx, self.port) @property - def can_run(self): + def can_run(self) -> bool: # if port is in use if self.port_active: self.errors.append(f'TCP port {self.port} already in use, unable to bind') @@ -7121,15 +7138,16 @@ class CephadmDaemon(): return len(self.errors) == 0 @staticmethod - def _unit_name(fsid, daemon_id): + def _unit_name(fsid: str, daemon_id: str) -> str: return '{}.service'.format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id)) @property - def unit_name(self): + def unit_name(self) -> str: + assert self.daemon_id is not None return CephadmDaemon._unit_name(self.fsid, self.daemon_id) @property - def daemon_path(self): + def daemon_path(self) -> str: return os.path.join( self.ctx.data_dir, self.fsid, @@ -7137,12 +7155,12 @@ class CephadmDaemon(): ) @property - def binary_path(self): + def binary_path(self) -> str: path = os.path.realpath(__file__) assert os.path.isfile(path) return path - def _handle_thread_exception(self, exc, thread_type): + def _handle_thread_exception(self, exc: Exception, thread_type: str) -> None: e_msg = f'{exc.__class__.__name__} exception: {str(exc)}' thread_info = getattr(self.cephadm_cache, thread_type) errors = thread_info.get('scrape_errors', []) @@ -7157,7 +7175,7 @@ class CephadmDaemon(): } ) - def _scrape_host_facts(self, refresh_interval=10): + def _scrape_host_facts(self, refresh_interval: int = 10) -> None: ctr = 0 exception_encountered = False @@ -7200,7 +7218,7 @@ class CephadmDaemon(): ctr += CephadmDaemon.loop_delay logger.info('host-facts thread stopped') - def _scrape_ceph_volume(self, refresh_interval=15): + def _scrape_ceph_volume(self, refresh_interval: int = 15) -> None: # we're invoking the ceph_volume command, so we need to set the args that it # expects to use self.ctx.command = 'inventory --format=json'.split() @@ -7259,7 +7277,7 @@ class CephadmDaemon(): logger.info('ceph-volume thread stopped') - def _scrape_list_daemons(self, refresh_interval=20): + def _scrape_list_daemons(self, refresh_interval: int = 20) -> None: ctr = 0 exception_encountered = False while True: @@ -7299,7 +7317,7 @@ class CephadmDaemon(): ctr += CephadmDaemon.loop_delay logger.info('list-daemons thread stopped') - def _create_thread(self, target, name, refresh_interval=None): + def _create_thread(self, target: Any, name: str, refresh_interval: Optional[int] = None) -> Thread: if refresh_interval: t = Thread(target=target, args=(refresh_interval,)) else: @@ -7316,7 +7334,7 @@ class CephadmDaemon(): logger.info(f'{start_msg}') return t - def reload(self, *args): + def reload(self, *args: Any) -> None: """reload -HUP received This is a placeholder function only, and serves to provide the hook that could @@ -7324,12 +7342,12 @@ class CephadmDaemon(): """ logger.info('Reload request received - ignoring, no action needed') - def shutdown(self, *args): + def shutdown(self, *args: Any) -> None: logger.info('Shutdown request received') self.stop = True self.http_server.shutdown() - def run(self): + def run(self) -> None: logger.info(f"cephadm exporter starting for FSID '{self.fsid}'") if not self.can_run: logger.error('Unable to start the exporter daemon') @@ -7384,7 +7402,7 @@ class CephadmDaemon(): logger.info('Main http server thread stopped') @property - def unit_run(self): + def unit_run(self) -> str: return """set -e {py3} {bin_path} exporter --fsid {fsid} --id {daemon_id} --port {port} &""".format( @@ -7396,7 +7414,7 @@ class CephadmDaemon(): ) @property - def unit_file(self): + def unit_file(self) -> str: docker = isinstance(self.ctx.container_engine, Docker) return """#generated by cephadm [Unit] @@ -7423,7 +7441,7 @@ WantedBy=ceph-{fsid}.target docker_after=' docker.service' if docker else '', docker_requires='Requires=docker.service\n' if docker else '') - def deploy_daemon_unit(self, config=None): + def deploy_daemon_unit(self, config: Optional[dict] = None) -> None: """deploy a specific unit file for cephadm The normal deploy_daemon_units doesn't apply for this @@ -7470,7 +7488,7 @@ WantedBy=ceph-{fsid}.target call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name]) @classmethod - def uninstall(cls, ctx: CephadmContext, fsid, daemon_type, daemon_id): + def uninstall(cls, ctx: CephadmContext, fsid: str, daemon_type: str, daemon_id: str) -> None: unit_name = CephadmDaemon._unit_name(fsid, daemon_id) unit_path = os.path.join(ctx.unit_dir, unit_name) unit_run = os.path.join(ctx.data_dir, fsid, f'{daemon_type}.{daemon_id}', 'unit.run') @@ -7508,7 +7526,7 @@ WantedBy=ceph-{fsid}.target stdout, stderr, rc = call(ctx, ['systemctl', 'daemon-reload']) -def command_exporter(ctx: CephadmContext): +def command_exporter(ctx: CephadmContext) -> None: exporter = CephadmDaemon(ctx, ctx.fsid, daemon_id=ctx.id, port=ctx.port) if ctx.fsid not in os.listdir(ctx.data_dir): @@ -7531,7 +7549,7 @@ def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool: @infer_fsid -def command_maintenance(ctx: CephadmContext): +def command_maintenance(ctx: CephadmContext) -> str: if not ctx.fsid: raise Error('must pass --fsid to specify cluster') @@ -7580,6 +7598,7 @@ def command_maintenance(ctx: CephadmContext): return 'failed - unable to start the target' else: return f'success - systemd target {target} enabled and started' + return f'success - systemd target {target} enabled and started' ################################## @@ -8223,7 +8242,7 @@ def _get_parser(): return parser -def _parse_args(av): +def _parse_args(av: List[str]) -> argparse.Namespace: parser = _get_parser() args = parser.parse_args(av) @@ -8280,7 +8299,7 @@ def cephadm_init(args: List[str]) -> CephadmContext: return ctx -def main(): +def main() -> None: # root? if os.geteuid() != 0: -- 2.39.5