From: Joao Eduardo Luis Date: Wed, 30 Dec 2020 13:23:32 +0000 (+0000) Subject: cephadm: fix linting/types issues X-Git-Tag: v17.0.0~8^2~15 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b3b6639b7f91c3d9463c8be2d970f09785b94630;p=ceph.git cephadm: fix linting/types issues Signed-off-by: Joao Eduardo Luis --- diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index 80b697dc0252..f4ae21875112 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -849,7 +849,7 @@ def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> A """ if require and key not in d.keys(): raise Error('{} missing from dict'.format(key)) - return d.get(key, default) + return d.get(key, default) # type: ignore ################################## @@ -1201,6 +1201,7 @@ def call(ctx: CephadmContext, ) for fd in reads: try: + message = str() message_b = os.read(fd, 1024) if isinstance(message_b, bytes): message = message_b.decode('utf-8') @@ -1645,7 +1646,7 @@ def _filter_last_local_ceph_image(ctx, out): def write_tmp(s, uid, gid): - # type: (str, int, int) -> Any + # type: (str, int, int) -> IO[str] tmp_f = tempfile.NamedTemporaryFile(mode='w', prefix='ceph-tmp') os.fchown(tmp_f.fileno(), uid, gid) @@ -3101,6 +3102,7 @@ def command_bootstrap(ctx): # type: (CephadmContext) -> int args = ctx.args + host: Optional[str] = None if not ctx.args.output_config: ctx.args.output_config = os.path.join(ctx.args.output_dir, 'ceph.conf') @@ -3457,7 +3459,7 @@ def command_bootstrap(ctx): is_available(ctx, 'mgr epoch %d' % epoch, mgr_has_latest_epoch) # ssh - host: Optional[str] = None + host = None if not ctx.args.skip_ssh: cli(['config-key', 'set', 'mgr/cephadm/ssh_user', ctx.args.ssh_user]) @@ -4157,7 +4159,7 @@ def command_ls(ctx): def list_daemons(ctx, detail=True, legacy_dir=None): # type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]] - host_version = None + host_version: Optional[str] = None ls = [] args = ctx.args container_path = ctx.container_path @@ -5570,18 +5572,18 @@ class HostFacts(): } def __init__(self, ctx: CephadmContext): - self.ctx = ctx - self.cpu_model = 'Unknown' - self.cpu_count = 0 - self.cpu_cores = 0 - self.cpu_threads = 0 - self.interfaces = {} - - self._meminfo = read_file(['/proc/meminfo']).splitlines() + self.ctx: CephadmContext = ctx + self.cpu_model: str = 'Unknown' + self.cpu_count: int = 0 + self.cpu_cores: int = 0 + self.cpu_threads: int = 0 + self.interfaces: Dict[str, Any] = {} + + self._meminfo: List[str] = read_file(['/proc/meminfo']).splitlines() self._get_cpuinfo() self._process_nics() - self.arch = platform.processor() - self.kernel = platform.release() + self.arch: str = platform.processor() + self.kernel: str = platform.release() def _get_cpuinfo(self): # type: () -> None @@ -5967,18 +5969,22 @@ class HostFacts(): return security return None + ret = None if os.path.exists('/sys/kernel/security/lsm'): lsm = read_file(['/sys/kernel/security/lsm']).strip() if 'selinux' in lsm: - return _fetch_selinux() + ret = _fetch_selinux() elif 'apparmor' in lsm: - return _fetch_apparmor() + ret = _fetch_apparmor() else: return { "type": "Unknown", "description": "Linux Security Module framework is active, but is not using SELinux or AppArmor" } + if ret is not None: + return ret + return { "type": "None", "description": "Linux Security Module framework is not available" @@ -6247,11 +6253,11 @@ class CephadmDaemon(): self.port = CephadmDaemon.default_port else: self.port = port - self.workers = [] + self.workers: List[Thread] = [] self.http_server: CephadmHTTPServer self.stop = False self.cephadm_cache = CephadmCache() - self.errors = [] + self.errors: List[str] = [] self.token = read_file([os.path.join(self.daemon_path, CephadmDaemon.token_name)]) @classmethod @@ -6654,7 +6660,8 @@ WantedBy=ceph-{fsid}.target unit_name = CephadmDaemon._unit_name(fsid, daemon_id) unit_path = os.path.join(args.unit_dir, unit_name) unit_run = os.path.join(args.data_dir, fsid, f"{daemon_type}.{daemon_id}", "unit.run") - try: + port = None + try: with open(unit_run, "r") as u: contents = u.read().strip(" &") except OSError: