self.vendor = read_file([os.path.join(self._dev_path, 'vendor')])
self.model = read_file([os.path.join(self._dev_path, 'model')])
- self.components = read_file([os.path.join(self._enc_path, 'components')])
+ self.components = read_file(
+ [os.path.join(self._enc_path, 'components')]
+ )
slot_paths = glob(os.path.join(self._enc_path, '*', 'slot'))
for slot_path in slot_paths:
slot = read_file([slot_path])
- serial_path = os.path.join(os.path.dirname(slot_path), 'device', 'vpd_pg80')
+ serial_path = os.path.join(
+ os.path.dirname(slot_path), 'device', 'vpd_pg80'
+ )
serial = ''
if os.path.exists(serial_path):
serial_raw = read_file([serial_path])
- serial = (''.join(char for char in serial_raw if char in string.printable)).strip()
+ serial = (
+ ''.join(
+ char
+ for char in serial_raw
+ if char in string.printable
+ )
+ ).strip()
self.device_lookup[serial] = slot
slot_dir = os.path.dirname(slot_path)
self.slot_map[slot] = {
def _dump(self) -> Dict[str, Any]:
"""Return a dict representation of the object"""
- return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
+ return {
+ k: v for k, v in self.__dict__.items() if not k.startswith('_')
+ }
def __str__(self) -> str:
"""Return a formatted json representation of the object as a string"""
return self._dump()
-class HostFacts():
+class HostFacts:
_dmi_path_list = ['/sys/class/dmi/id']
_nic_path_list = ['/sys/class/net']
_apparmor_path_list = ['/etc/apparmor']
- _disk_vendor_workarounds = {
- '0x1af4': 'Virtio Block Device'
- }
+ _disk_vendor_workarounds = {'0x1af4': 'Virtio Block Device'}
_excluded_block_devices = ('sr', 'zram', 'dm-', 'loop', 'md')
_sg_generic_glob = '/sys/class/scsi_generic/*'
def _populate_sysctl_options(self) -> Dict[str, str]:
sysctl_options = {}
- out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+ out, _, _ = call_throws(
+ self.ctx,
+ ['sysctl', '-a'],
+ verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+ )
if out:
for line in out.splitlines():
option, value = line.split('=')
def _get_block_devs(self):
# type: () -> List[str]
"""Determine the list of block devices by looking at /sys/block"""
- return [dev for dev in os.listdir('/sys/block')
- if not dev.startswith(HostFacts._excluded_block_devices)]
+ return [
+ dev
+ for dev in os.listdir('/sys/block')
+ if not dev.startswith(HostFacts._excluded_block_devices)
+ ]
@property
def operating_system(self):
def subscribed(self):
# type: () -> str
"""Highlevel check to see if the host is subscribed to receive updates/support"""
+
def _red_hat():
# type: () -> str
# RHEL 7 and RHEL 8
scsi_addr = ''
mpath = ''
- disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip()
- disk_rev = read_file(['/sys/block/{}/device/rev'.format(dev)]).strip()
- disk_wwid = read_file(['/sys/block/{}/device/wwid'.format(dev)]).strip()
- vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip()
- rotational = read_file(['/sys/block/{}/queue/rotational'.format(dev)])
+ disk_model = read_file(
+ ['/sys/block/{}/device/model'.format(dev)]
+ ).strip()
+ disk_rev = read_file(
+ ['/sys/block/{}/device/rev'.format(dev)]
+ ).strip()
+ disk_wwid = read_file(
+ ['/sys/block/{}/device/wwid'.format(dev)]
+ ).strip()
+ vendor = read_file(
+ ['/sys/block/{}/device/vendor'.format(dev)]
+ ).strip()
+ rotational = read_file(
+ ['/sys/block/{}/queue/rotational'.format(dev)]
+ )
holders_raw = glob('/sys/block/{}/holders/*'.format(dev))
if len(holders_raw) == 1:
# mpath will have 1 holder entry
scsi_addr = os.path.basename(scsi_addr_path[0])
# vpd_pg80 isn't guaranteed (libvirt, vmware for example)
- serial_raw = read_file(['/sys/block/{}/device/vpd_pg80'.format(dev)])
- serial = (''.join(i for i in serial_raw if i in string.printable)).strip()
+ serial_raw = read_file(
+ ['/sys/block/{}/device/vpd_pg80'.format(dev)]
+ )
+ serial = (
+ ''.join(i for i in serial_raw if i in string.printable)
+ ).strip()
if serial.lower() == 'unknown':
serial = ''
else:
enclosure_id = enc_id
enclosure_slot = enclosure.device_lookup[serial]
- disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
+ disk_vendor = HostFacts._disk_vendor_workarounds.get(
+ vendor, vendor
+ )
disk_size_bytes = self._get_capacity(dev)
- disk_list.append({
- 'description': '{} {} ({})'.format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)),
- 'vendor': disk_vendor,
- 'model': disk_model,
- 'rev': disk_rev,
- 'wwid': disk_wwid,
- 'dev_name': dev,
- 'disk_size_bytes': disk_size_bytes,
- 'disk_type': disk_type,
- 'serial': serial,
- 'alt_dev_name': '',
- 'scsi_addr': scsi_addr,
- 'enclosure_id': enclosure_id,
- 'enclosure_slot': enclosure_slot,
- 'path_id': disk_path_map.get(dev, ''),
- 'mpath': mpath,
- })
+ disk_list.append(
+ {
+ 'description': '{} {} ({})'.format(
+ disk_vendor,
+ disk_model,
+ bytes_to_human(disk_size_bytes),
+ ),
+ 'vendor': disk_vendor,
+ 'model': disk_model,
+ 'rev': disk_rev,
+ 'wwid': disk_wwid,
+ 'dev_name': dev,
+ 'disk_size_bytes': disk_size_bytes,
+ 'disk_type': disk_type,
+ 'serial': serial,
+ 'alt_dev_name': '',
+ 'scsi_addr': scsi_addr,
+ 'enclosure_id': enclosure_id,
+ 'enclosure_slot': enclosure_slot,
+ 'path_id': disk_path_map.get(dev, ''),
+ 'mpath': mpath,
+ }
+ )
# process the devices to drop duplicate physical devs based on matching
# the unique serial number
def flash_list(self):
# type: () -> List[Dict[str, object]]
"""Return a list of devices that are flash based (SSD, NVMe)"""
- return [dev for dev in self._device_list if dev['disk_type'] == 'flash']
+ return [
+ dev for dev in self._device_list if dev['disk_type'] == 'flash'
+ ]
@property
def hdd_capacity_bytes(self):
if not os.path.exists(nic_path):
continue
for iface in os.listdir(nic_path):
-
if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
nic_type = 'bridge'
elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
nic_type = 'bonding'
else:
- nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
+ nic_type = hw_lookup.get(
+ read_file([os.path.join(nic_path, iface, 'type')]),
+ 'Unknown',
+ )
if nic_type == 'loopback': # skip loopback devices
continue
- lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))]
- upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))]
+ lower_devs_list = [
+ os.path.basename(link.replace('lower_', ''))
+ for link in glob(os.path.join(nic_path, iface, 'lower_*'))
+ ]
+ upper_devs_list = [
+ os.path.basename(link.replace('upper_', ''))
+ for link in glob(os.path.join(nic_path, iface, 'upper_*'))
+ ]
try:
- mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')]))
+ mtu = int(
+ read_file([os.path.join(nic_path, iface, 'mtu')])
+ )
except ValueError:
mtu = 0
- operstate = read_file([os.path.join(nic_path, iface, 'operstate')])
+ operstate = read_file(
+ [os.path.join(nic_path, iface, 'operstate')]
+ )
try:
- speed = int(read_file([os.path.join(nic_path, iface, 'speed')]))
+ speed = int(
+ read_file([os.path.join(nic_path, iface, 'speed')])
+ )
except (OSError, ValueError):
# OSError : device doesn't support the ethtool get_link_ksettings
# ValueError : raised when the read fails, and returns Unknown
iftype = 'physical'
driver_path = os.path.join(dev_link, 'driver')
if os.path.exists(driver_path):
- driver = os.path.basename(os.path.realpath(driver_path))
+ driver = os.path.basename(
+ os.path.realpath(driver_path)
+ )
else:
driver = 'Unknown'
def kernel_security(self):
# type: () -> Dict[str, str]
"""Determine the security features enabled in the kernel - SELinux, AppArmor"""
+
def _fetch_selinux() -> Dict[str, str]:
"""Get the selinux status"""
security = {}
try:
- out, err, code = call(self.ctx, ['sestatus'],
- verbosity=CallVerbosity.QUIET)
+ out, err, code = call(
+ self.ctx, ['sestatus'], verbosity=CallVerbosity.QUIET
+ )
security['type'] = 'SELinux'
status, mode, policy = '', '', ''
for line in out.split('\n'):
if status == 'disabled':
security['description'] = 'SELinux: Disabled'
else:
- security['description'] = 'SELinux: Enabled({}, {})'.format(mode, policy)
+ security[
+ 'description'
+ ] = 'SELinux: Enabled({}, {})'.format(mode, policy)
except Exception as e:
logger.info('unable to get selinux status: %s' % e)
return security
security['type'] = 'AppArmor'
security['description'] = 'AppArmor: Enabled'
try:
- profiles = read_file(['/sys/kernel/security/apparmor/profiles'])
+ profiles = read_file(
+ ['/sys/kernel/security/apparmor/profiles']
+ )
if len(profiles) == 0:
return {}
except OSError:
summary[mode] += 1
else:
summary[mode] = 0
- summary_str = ','.join(['{} {}'.format(v, k) for k, v in summary.items()])
+ summary_str = ','.join(
+ ['{} {}'.format(v, k) for k, v in summary.items()]
+ )
security = {**security, **summary} # type: ignore
security['description'] += '({})'.format(summary_str)
else:
return {
'type': 'Unknown',
- 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor'
+ 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor',
}
if ret:
return {
'type': 'None',
- 'description': 'Linux Security Module framework is not available'
+ 'description': 'Linux Security Module framework is not available',
}
@property
def selinux_enabled(self) -> bool:
- return (self.kernel_security['type'] == 'SELinux') and \
- (self.kernel_security['description'] != 'SELinux: Disabled')
+ return (self.kernel_security['type'] == 'SELinux') and (
+ self.kernel_security['description'] != 'SELinux: Disabled'
+ )
@property
def kernel_parameters(self):
"""Get kernel parameters required/used in Ceph clusters"""
k_param = {}
- out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
+ out, _, _ = call_throws(
+ self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT
+ )
if out:
param_list = out.split('\n')
- param_dict = {param.split(' = ')[0]: param.split(' = ')[-1] for param in param_list}
+ param_dict = {
+ param.split(' = ')[0]: param.split(' = ')[-1]
+ for param in param_list
+ }
# return only desired parameters
if 'net.ipv4.ip_nonlocal_bind' in param_dict:
- k_param['net.ipv4.ip_nonlocal_bind'] = param_dict['net.ipv4.ip_nonlocal_bind']
+ k_param['net.ipv4.ip_nonlocal_bind'] = param_dict[
+ 'net.ipv4.ip_nonlocal_bind'
+ ]
return k_param
# Connections state documentation
# tcp - https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h
# udp - uses 07 (TCP_CLOSE or UNCONN, since udp is stateless. test with netcat -ul <port>)
- listening_state = {
- 'tcp': '0A',
- 'udp': '07'
- }
+ listening_state = {'tcp': '0A', 'udp': '07'}
if protocol not in listening_state.keys():
return []
# type: () -> str
"""Return the attributes of this HostFacts object as json"""
data = {
- k: getattr(self, k) for k in dir(self)
+ k: getattr(self, k)
+ for k in dir(self)
if not k.startswith('_')
- and isinstance(getattr(self, k), (float, int, str, list, dict, tuple))
+ and isinstance(
+ getattr(self, k), (float, int, str, list, dict, tuple)
+ )
}
return json.dumps(data, indent=2, sort_keys=True)
return res
-def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
+def _list_ipv4_networks(
+ ctx: CephadmContext,
+) -> Dict[str, Dict[str, Set[str]]]:
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
- out, _, _ = call_throws(ctx, [execstr, 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+ out, _, _ = call_throws(
+ ctx,
+ [execstr, 'route', 'ls'],
+ verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+ )
return _parse_ipv4_route(out)
def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]:
r = {} # type: Dict[str, Dict[str, Set[str]]]
- p = re.compile(r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)')
+ p = re.compile(
+ r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)'
+ )
for line in out.splitlines():
m = p.findall(line)
if not m:
return r
-def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
+def _list_ipv6_networks(
+ ctx: CephadmContext,
+) -> Dict[str, Dict[str, Set[str]]]:
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
- routes, _, _ = call_throws(ctx, [execstr, '-6', 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
- ips, _, _ = call_throws(ctx, [execstr, '-6', 'addr', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+ routes, _, _ = call_throws(
+ ctx,
+ [execstr, '-6', 'route', 'ls'],
+ verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+ )
+ ips, _, _ = call_throws(
+ ctx,
+ [execstr, '-6', 'addr', 'ls'],
+ verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+ )
return _parse_ipv6_route(routes, ips)
-def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]:
+def _parse_ipv6_route(
+ routes: str, ips: str
+) -> Dict[str, Dict[str, Set[str]]]:
r = {} # type: Dict[str, Dict[str, Set[str]]]
- route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
+ route_p = re.compile(
+ r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$'
+ )
ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
iface_p = re.compile(r'^(\d+): (\S+): (.*)$')
for line in routes.splitlines():
continue
ip = m[0][0]
# find the network it belongs to
- net = [n for n in r.keys()
- if ipaddress.ip_address(ip) in ipaddress.ip_network(n)]
+ net = [
+ n
+ for n in r.keys()
+ if ipaddress.ip_address(ip) in ipaddress.ip_network(n)
+ ]
if net and iface in r[net[0]]:
assert iface
r[net[0]][iface].add(ip)