"""
if require and key not in d.keys():
raise Error('{} missing from dict'.format(key))
- return d.get(key, default)
+ return d.get(key, default) # type: ignore
##################################
)
for fd in reads:
try:
+ message = str()
message_b = os.read(fd, 1024)
if isinstance(message_b, bytes):
message = message_b.decode('utf-8')
def write_tmp(s, uid, gid):
- # type: (str, int, int) -> Any
+ # type: (str, int, int) -> IO[str]
tmp_f = tempfile.NamedTemporaryFile(mode='w',
prefix='ceph-tmp')
os.fchown(tmp_f.fileno(), uid, gid)
# type: (CephadmContext) -> int
args = ctx.args
+ host: Optional[str] = None
if not ctx.args.output_config:
ctx.args.output_config = os.path.join(ctx.args.output_dir, 'ceph.conf')
is_available(ctx, 'mgr epoch %d' % epoch, mgr_has_latest_epoch)
# ssh
- host: Optional[str] = None
+ host = None
if not ctx.args.skip_ssh:
cli(['config-key', 'set', 'mgr/cephadm/ssh_user', ctx.args.ssh_user])
def list_daemons(ctx, detail=True, legacy_dir=None):
# type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]]
- host_version = None
+ host_version: Optional[str] = None
ls = []
args = ctx.args
container_path = ctx.container_path
}
def __init__(self, ctx: CephadmContext):
- self.ctx = ctx
- self.cpu_model = 'Unknown'
- self.cpu_count = 0
- self.cpu_cores = 0
- self.cpu_threads = 0
- self.interfaces = {}
-
- self._meminfo = read_file(['/proc/meminfo']).splitlines()
+ self.ctx: CephadmContext = ctx
+ self.cpu_model: str = 'Unknown'
+ self.cpu_count: int = 0
+ self.cpu_cores: int = 0
+ self.cpu_threads: int = 0
+ self.interfaces: Dict[str, Any] = {}
+
+ self._meminfo: List[str] = read_file(['/proc/meminfo']).splitlines()
self._get_cpuinfo()
self._process_nics()
- self.arch = platform.processor()
- self.kernel = platform.release()
+ self.arch: str = platform.processor()
+ self.kernel: str = platform.release()
def _get_cpuinfo(self):
# type: () -> None
return security
return None
+ ret = None
if os.path.exists('/sys/kernel/security/lsm'):
lsm = read_file(['/sys/kernel/security/lsm']).strip()
if 'selinux' in lsm:
- return _fetch_selinux()
+ ret = _fetch_selinux()
elif 'apparmor' in lsm:
- return _fetch_apparmor()
+ ret = _fetch_apparmor()
else:
return {
"type": "Unknown",
"description": "Linux Security Module framework is active, but is not using SELinux or AppArmor"
}
+ if ret is not None:
+ return ret
+
return {
"type": "None",
"description": "Linux Security Module framework is not available"
self.port = CephadmDaemon.default_port
else:
self.port = port
- self.workers = []
+ self.workers: List[Thread] = []
self.http_server: CephadmHTTPServer
self.stop = False
self.cephadm_cache = CephadmCache()
- self.errors = []
+ self.errors: List[str] = []
self.token = read_file([os.path.join(self.daemon_path, CephadmDaemon.token_name)])
@classmethod