import ssl
-try:
- from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
-except ImportError:
- pass
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
import re
import uuid
# https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py
# that drops all of the compatibility (this is Unix/Linux only).
-try:
- TimeoutError
-except NameError:
- TimeoutError = OSError
-
-
class Timeout(TimeoutError):
"""
Raised when the lock could not be acquired in *timeout*
message_b = os.read(fd, 1024)
if isinstance(message_b, bytes):
message = message_b.decode('utf-8')
- if isinstance(message_b, str):
+ elif isinstance(message_b, str):
message = message_b
+ else:
+ assert False
if stop and message:
# process has terminated, but have more to read still, so not stopping yet
# (os.read returns '' when it encounters EOF)
config_dir = 'etc/alertmanager'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
+ else:
+ assert False
# populate the config directory for the component from the config-json
for fname in required_files:
else:
return
+ if not self.cmd:
+ raise RuntimeError("command not defined")
+
out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbose_on_failure=False)
if ret:
logger.info('Enabling firewalld service %s in current zone...' % svc)
logger.debug('Not possible to open ports <%s>. firewalld.service is not available' % fw_ports)
return
+ if not self.cmd:
+ raise RuntimeError("command not defined")
+
for port in fw_ports:
tcp_port = str(port) + '/tcp'
out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
logger.debug('Not possible to close ports <%s>. firewalld.service is not available' % fw_ports)
return
+ if not self.cmd:
+ raise RuntimeError("command not defined")
+
for port in fw_ports:
tcp_port = str(port) + '/tcp'
out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
if not self.available:
return
+ if not self.cmd:
+ raise RuntimeError("command not defined")
+
call_throws([self.cmd, '--reload'])
is_available('mgr epoch %d' % epoch, mgr_has_latest_epoch)
# ssh
+ host: Optional[str] = None
if not args.skip_ssh:
cli(['config-key', 'set', 'mgr/cephadm/ssh_user', args.ssh_user])
def _list_ipv4_networks():
- out, _, _ = call_throws([find_executable('ip'), 'route', 'ls'])
+ execstr: Optional[str] = find_executable('ip')
+ if not execstr:
+ raise FileNotFoundError("unable to find 'ip' command")
+ out, _, _ = call_throws([execstr, 'route', 'ls'])
return _parse_ipv4_route(out)
def _list_ipv6_networks():
- routes, _, _ = call_throws([find_executable('ip'), '-6', 'route', 'ls'])
- ips, _, _ = call_throws([find_executable('ip'), '-6', 'addr', 'ls'])
+ execstr: Optional[str] = find_executable('ip')
+ if not execstr:
+ raise FileNotFoundError("unable to find 'ip' command")
+ routes, _, _ = call_throws([execstr, '-6', 'route', 'ls'])
+ ips, _, _ = call_throws([execstr, '-6', 'addr', 'ls'])
return _parse_ipv6_route(routes, ips)
cluster, daemon_type, daemon_id,
legacy_dir=legacy_dir)
legacy_unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id)
- i = {
+ val: Dict[str, Any] = {
'style': 'legacy',
'name': '%s.%s' % (daemon_type, daemon_id),
'fsid': fsid if fsid is not None else 'unknown',
'systemd_unit': legacy_unit_name,
}
if detail:
- (i['enabled'], i['state'], _) = check_unit(legacy_unit_name)
+ (val['enabled'], val['state'], _) = check_unit(legacy_unit_name)
if not host_version:
try:
out, err, code = call(['ceph', '-v'])
host_version = out.split(' ')[2]
except Exception:
pass
- i['host_version'] = host_version
- ls.append(i)
+ val['host_version'] = host_version
+ ls.append(val)
elif is_fsid(i):
fsid = str(i) # convince mypy that fsid is a str here
for j in os.listdir(os.path.join(data_dir, i)):
daemon_id)
else:
continue
- i = {
+ val = {
'style': 'cephadm:v1',
'name': name,
'fsid': fsid,
}
if detail:
# get container id
- (i['enabled'], i['state'], _) = check_unit(unit_name)
+ (val['enabled'], val['state'], _) = check_unit(unit_name)
container_id = None
image_name = None
image_id = None
image_name = f.read().strip() or None
except IOError:
pass
- i['container_id'] = container_id
- i['container_image_name'] = image_name
- i['container_image_id'] = image_id
- i['version'] = version
- i['started'] = start_stamp
- i['created'] = get_file_timestamp(
+ val['container_id'] = container_id
+ val['container_image_name'] = image_name
+ val['container_image_id'] = image_id
+ val['version'] = version
+ val['started'] = start_stamp
+ val['created'] = get_file_timestamp(
os.path.join(data_dir, fsid, j, 'unit.created')
)
- i['deployed'] = get_file_timestamp(
+ val['deployed'] = get_file_timestamp(
os.path.join(data_dir, fsid, j, 'unit.image'))
- i['configured'] = get_file_timestamp(
+ val['configured'] = get_file_timestamp(
os.path.join(data_dir, fsid, j, 'unit.configured'))
- ls.append(i)
+ ls.append(val)
return ls
except HTTPError as err:
logger.error('repository not found in shaman (might not be available yet)')
raise Error('%s, failed to fetch %s' % (err, shaman_url))
+ chacra_url = ''
try:
chacra_url = shaman_response.geturl()
chacra_response = urlopen(chacra_url)
return float(up_secs)
def kernel_security(self):
- # type: () -> Dict[str, str]
+ # type: () -> Optional[Dict[str, str]]
"""Determine the security features enabled in the kernel - SELinux, AppArmor"""
- def _fetch_selinux():
+ def _fetch_selinux() -> Optional[Dict[str, str]]:
"""Read the selinux config file to determine state"""
security = {}
for selinux_path in HostFacts._selinux_path_list:
else:
security['description'] = "SELinux: Enabled({}, {})".format(security['SELINUX'], security['SELINUXTYPE'])
return security
+ return None
- def _fetch_apparmor():
+ def _fetch_apparmor() -> Optional[Dict[str, str]]:
"""Read the apparmor profiles directly, returning an overview of AppArmor status"""
security = {}
for apparmor_path in HostFacts._apparmor_path_list:
security['description'] += "({})".format(summary_str)
return security
+ return None
if os.path.exists('/sys/kernel/security/lsm'):
lsm = read_file(['/sys/kernel/security/lsm']).strip()
# json, it won't parse
stdout = stream.getvalue()
+ data = []
if stdout:
try:
data = json.loads(stdout)
except json.decoder.JSONDecodeError:
errors.append("ceph-volume thread provided bad json data")
logger.warning(errors[-1])
- data = []
else:
errors.append("ceph-volume didn't return any data")
logger.warning(errors[-1])
logger.warning(f"Unable to access the unit.run file @ {unit_run}")
return
+ port = None
for line in contents.split('\n'):
if '--port ' in line:
try: