From dec3717b459e20b7a8b28b6a25a4397481b69513 Mon Sep 17 00:00:00 2001 From: Adam King Date: Thu, 21 Sep 2023 16:28:09 -0400 Subject: [PATCH] cephadm: format black host_facts.py Signed-off-by: Adam King --- src/cephadm/cephadmlib/host_facts.py | 245 +++++++++++++++++++-------- 1 file changed, 172 insertions(+), 73 deletions(-) diff --git a/src/cephadm/cephadmlib/host_facts.py b/src/cephadm/cephadmlib/host_facts.py index d8b072227fb..1cfb2ac84d9 100644 --- a/src/cephadm/cephadmlib/host_facts.py +++ b/src/cephadm/cephadmlib/host_facts.py @@ -55,15 +55,25 @@ class Enclosure: self.vendor = read_file([os.path.join(self._dev_path, 'vendor')]) self.model = read_file([os.path.join(self._dev_path, 'model')]) - self.components = read_file([os.path.join(self._enc_path, 'components')]) + self.components = read_file( + [os.path.join(self._enc_path, 'components')] + ) slot_paths = glob(os.path.join(self._enc_path, '*', 'slot')) for slot_path in slot_paths: slot = read_file([slot_path]) - serial_path = os.path.join(os.path.dirname(slot_path), 'device', 'vpd_pg80') + serial_path = os.path.join( + os.path.dirname(slot_path), 'device', 'vpd_pg80' + ) serial = '' if os.path.exists(serial_path): serial_raw = read_file([serial_path]) - serial = (''.join(char for char in serial_raw if char in string.printable)).strip() + serial = ( + ''.join( + char + for char in serial_raw + if char in string.printable + ) + ).strip() self.device_lookup[serial] = slot slot_dir = os.path.dirname(slot_path) self.slot_map[slot] = { @@ -86,7 +96,9 @@ class Enclosure: def _dump(self) -> Dict[str, Any]: """Return a dict representation of the object""" - return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} + return { + k: v for k, v in self.__dict__.items() if not k.startswith('_') + } def __str__(self) -> str: """Return a formatted json representation of the object as a string""" @@ -101,13 +113,11 @@ class Enclosure: return self._dump() -class HostFacts(): +class HostFacts: _dmi_path_list = ['/sys/class/dmi/id'] _nic_path_list = ['/sys/class/net'] _apparmor_path_list = ['/etc/apparmor'] - _disk_vendor_workarounds = { - '0x1af4': 'Virtio Block Device' - } + _disk_vendor_workarounds = {'0x1af4': 'Virtio Block Device'} _excluded_block_devices = ('sr', 'zram', 'dm-', 'loop', 'md') _sg_generic_glob = '/sys/class/scsi_generic/*' @@ -131,7 +141,11 @@ class HostFacts(): def _populate_sysctl_options(self) -> Dict[str, str]: sysctl_options = {} - out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) + out, _, _ = call_throws( + self.ctx, + ['sysctl', '-a'], + verbosity=CallVerbosity.QUIET_UNLESS_ERROR, + ) if out: for line in out.splitlines(): option, value = line.split('=') @@ -204,8 +218,11 @@ class HostFacts(): def _get_block_devs(self): # type: () -> List[str] """Determine the list of block devices by looking at /sys/block""" - return [dev for dev in os.listdir('/sys/block') - if not dev.startswith(HostFacts._excluded_block_devices)] + return [ + dev + for dev in os.listdir('/sys/block') + if not dev.startswith(HostFacts._excluded_block_devices) + ] @property def operating_system(self): @@ -245,6 +262,7 @@ class HostFacts(): def subscribed(self): # type: () -> str """Highlevel check to see if the host is subscribed to receive updates/support""" + def _red_hat(): # type: () -> str # RHEL 7 and RHEL 8 @@ -320,11 +338,21 @@ class HostFacts(): scsi_addr = '' mpath = '' - disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip() - disk_rev = read_file(['/sys/block/{}/device/rev'.format(dev)]).strip() - disk_wwid = read_file(['/sys/block/{}/device/wwid'.format(dev)]).strip() - vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip() - rotational = read_file(['/sys/block/{}/queue/rotational'.format(dev)]) + disk_model = read_file( + ['/sys/block/{}/device/model'.format(dev)] + ).strip() + disk_rev = read_file( + ['/sys/block/{}/device/rev'.format(dev)] + ).strip() + disk_wwid = read_file( + ['/sys/block/{}/device/wwid'.format(dev)] + ).strip() + vendor = read_file( + ['/sys/block/{}/device/vendor'.format(dev)] + ).strip() + rotational = read_file( + ['/sys/block/{}/queue/rotational'.format(dev)] + ) holders_raw = glob('/sys/block/{}/holders/*'.format(dev)) if len(holders_raw) == 1: # mpath will have 1 holder entry @@ -337,8 +365,12 @@ class HostFacts(): scsi_addr = os.path.basename(scsi_addr_path[0]) # vpd_pg80 isn't guaranteed (libvirt, vmware for example) - serial_raw = read_file(['/sys/block/{}/device/vpd_pg80'.format(dev)]) - serial = (''.join(i for i in serial_raw if i in string.printable)).strip() + serial_raw = read_file( + ['/sys/block/{}/device/vpd_pg80'.format(dev)] + ) + serial = ( + ''.join(i for i in serial_raw if i in string.printable) + ).strip() if serial.lower() == 'unknown': serial = '' else: @@ -351,25 +383,33 @@ class HostFacts(): enclosure_id = enc_id enclosure_slot = enclosure.device_lookup[serial] - disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor) + disk_vendor = HostFacts._disk_vendor_workarounds.get( + vendor, vendor + ) disk_size_bytes = self._get_capacity(dev) - disk_list.append({ - 'description': '{} {} ({})'.format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)), - 'vendor': disk_vendor, - 'model': disk_model, - 'rev': disk_rev, - 'wwid': disk_wwid, - 'dev_name': dev, - 'disk_size_bytes': disk_size_bytes, - 'disk_type': disk_type, - 'serial': serial, - 'alt_dev_name': '', - 'scsi_addr': scsi_addr, - 'enclosure_id': enclosure_id, - 'enclosure_slot': enclosure_slot, - 'path_id': disk_path_map.get(dev, ''), - 'mpath': mpath, - }) + disk_list.append( + { + 'description': '{} {} ({})'.format( + disk_vendor, + disk_model, + bytes_to_human(disk_size_bytes), + ), + 'vendor': disk_vendor, + 'model': disk_model, + 'rev': disk_rev, + 'wwid': disk_wwid, + 'dev_name': dev, + 'disk_size_bytes': disk_size_bytes, + 'disk_type': disk_type, + 'serial': serial, + 'alt_dev_name': '', + 'scsi_addr': scsi_addr, + 'enclosure_id': enclosure_id, + 'enclosure_slot': enclosure_slot, + 'path_id': disk_path_map.get(dev, ''), + 'mpath': mpath, + } + ) # process the devices to drop duplicate physical devs based on matching # the unique serial number @@ -399,7 +439,9 @@ class HostFacts(): def flash_list(self): # type: () -> List[Dict[str, object]] """Return a list of devices that are flash based (SSD, NVMe)""" - return [dev for dev in self._device_list if dev['disk_type'] == 'flash'] + return [ + dev for dev in self._device_list if dev['disk_type'] == 'flash' + ] @property def hdd_capacity_bytes(self): @@ -457,28 +499,42 @@ class HostFacts(): if not os.path.exists(nic_path): continue for iface in os.listdir(nic_path): - if os.path.exists(os.path.join(nic_path, iface, 'bridge')): nic_type = 'bridge' elif os.path.exists(os.path.join(nic_path, iface, 'bonding')): nic_type = 'bonding' else: - nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown') + nic_type = hw_lookup.get( + read_file([os.path.join(nic_path, iface, 'type')]), + 'Unknown', + ) if nic_type == 'loopback': # skip loopback devices continue - lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))] - upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))] + lower_devs_list = [ + os.path.basename(link.replace('lower_', '')) + for link in glob(os.path.join(nic_path, iface, 'lower_*')) + ] + upper_devs_list = [ + os.path.basename(link.replace('upper_', '')) + for link in glob(os.path.join(nic_path, iface, 'upper_*')) + ] try: - mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')])) + mtu = int( + read_file([os.path.join(nic_path, iface, 'mtu')]) + ) except ValueError: mtu = 0 - operstate = read_file([os.path.join(nic_path, iface, 'operstate')]) + operstate = read_file( + [os.path.join(nic_path, iface, 'operstate')] + ) try: - speed = int(read_file([os.path.join(nic_path, iface, 'speed')])) + speed = int( + read_file([os.path.join(nic_path, iface, 'speed')]) + ) except (OSError, ValueError): # OSError : device doesn't support the ethtool get_link_ksettings # ValueError : raised when the read fails, and returns Unknown @@ -491,7 +547,9 @@ class HostFacts(): iftype = 'physical' driver_path = os.path.join(dev_link, 'driver') if os.path.exists(driver_path): - driver = os.path.basename(os.path.realpath(driver_path)) + driver = os.path.basename( + os.path.realpath(driver_path) + ) else: driver = 'Unknown' @@ -613,12 +671,14 @@ class HostFacts(): def kernel_security(self): # type: () -> Dict[str, str] """Determine the security features enabled in the kernel - SELinux, AppArmor""" + def _fetch_selinux() -> Dict[str, str]: """Get the selinux status""" security = {} try: - out, err, code = call(self.ctx, ['sestatus'], - verbosity=CallVerbosity.QUIET) + out, err, code = call( + self.ctx, ['sestatus'], verbosity=CallVerbosity.QUIET + ) security['type'] = 'SELinux' status, mode, policy = '', '', '' for line in out.split('\n'): @@ -634,7 +694,9 @@ class HostFacts(): if status == 'disabled': security['description'] = 'SELinux: Disabled' else: - security['description'] = 'SELinux: Enabled({}, {})'.format(mode, policy) + security[ + 'description' + ] = 'SELinux: Enabled({}, {})'.format(mode, policy) except Exception as e: logger.info('unable to get selinux status: %s' % e) return security @@ -647,7 +709,9 @@ class HostFacts(): security['type'] = 'AppArmor' security['description'] = 'AppArmor: Enabled' try: - profiles = read_file(['/sys/kernel/security/apparmor/profiles']) + profiles = read_file( + ['/sys/kernel/security/apparmor/profiles'] + ) if len(profiles) == 0: return {} except OSError: @@ -661,7 +725,9 @@ class HostFacts(): summary[mode] += 1 else: summary[mode] = 0 - summary_str = ','.join(['{} {}'.format(v, k) for k, v in summary.items()]) + summary_str = ','.join( + ['{} {}'.format(v, k) for k, v in summary.items()] + ) security = {**security, **summary} # type: ignore security['description'] += '({})'.format(summary_str) @@ -678,7 +744,7 @@ class HostFacts(): else: return { 'type': 'Unknown', - 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor' + 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor', } if ret: @@ -686,13 +752,14 @@ class HostFacts(): return { 'type': 'None', - 'description': 'Linux Security Module framework is not available' + 'description': 'Linux Security Module framework is not available', } @property def selinux_enabled(self) -> bool: - return (self.kernel_security['type'] == 'SELinux') and \ - (self.kernel_security['description'] != 'SELinux: Disabled') + return (self.kernel_security['type'] == 'SELinux') and ( + self.kernel_security['description'] != 'SELinux: Disabled' + ) @property def kernel_parameters(self): @@ -700,14 +767,21 @@ class HostFacts(): """Get kernel parameters required/used in Ceph clusters""" k_param = {} - out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT) + out, _, _ = call_throws( + self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT + ) if out: param_list = out.split('\n') - param_dict = {param.split(' = ')[0]: param.split(' = ')[-1] for param in param_list} + param_dict = { + param.split(' = ')[0]: param.split(' = ')[-1] + for param in param_list + } # return only desired parameters if 'net.ipv4.ip_nonlocal_bind' in param_dict: - k_param['net.ipv4.ip_nonlocal_bind'] = param_dict['net.ipv4.ip_nonlocal_bind'] + k_param['net.ipv4.ip_nonlocal_bind'] = param_dict[ + 'net.ipv4.ip_nonlocal_bind' + ] return k_param @@ -717,10 +791,7 @@ class HostFacts(): # Connections state documentation # tcp - https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h # udp - uses 07 (TCP_CLOSE or UNCONN, since udp is stateless. test with netcat -ul ) - listening_state = { - 'tcp': '0A', - 'udp': '07' - } + listening_state = {'tcp': '0A', 'udp': '07'} if protocol not in listening_state.keys(): return [] @@ -757,9 +828,12 @@ class HostFacts(): # type: () -> str """Return the attributes of this HostFacts object as json""" data = { - k: getattr(self, k) for k in dir(self) + k: getattr(self, k) + for k in dir(self) if not k.startswith('_') - and isinstance(getattr(self, k), (float, int, str, list, dict, tuple)) + and isinstance( + getattr(self, k), (float, int, str, list, dict, tuple) + ) } return json.dumps(data, indent=2, sort_keys=True) @@ -778,17 +852,25 @@ def list_networks(ctx): return res -def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]: +def _list_ipv4_networks( + ctx: CephadmContext, +) -> Dict[str, Dict[str, Set[str]]]: execstr: Optional[str] = find_executable('ip') if not execstr: raise FileNotFoundError("unable to find 'ip' command") - out, _, _ = call_throws(ctx, [execstr, 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) + out, _, _ = call_throws( + ctx, + [execstr, 'route', 'ls'], + verbosity=CallVerbosity.QUIET_UNLESS_ERROR, + ) return _parse_ipv4_route(out) def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]: r = {} # type: Dict[str, Dict[str, Set[str]]] - p = re.compile(r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)') + p = re.compile( + r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)' + ) for line in out.splitlines(): m = p.findall(line) if not m: @@ -806,18 +888,32 @@ def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]: return r -def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]: +def _list_ipv6_networks( + ctx: CephadmContext, +) -> Dict[str, Dict[str, Set[str]]]: execstr: Optional[str] = find_executable('ip') if not execstr: raise FileNotFoundError("unable to find 'ip' command") - routes, _, _ = call_throws(ctx, [execstr, '-6', 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) - ips, _, _ = call_throws(ctx, [execstr, '-6', 'addr', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) + routes, _, _ = call_throws( + ctx, + [execstr, '-6', 'route', 'ls'], + verbosity=CallVerbosity.QUIET_UNLESS_ERROR, + ) + ips, _, _ = call_throws( + ctx, + [execstr, '-6', 'addr', 'ls'], + verbosity=CallVerbosity.QUIET_UNLESS_ERROR, + ) return _parse_ipv6_route(routes, ips) -def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]: +def _parse_ipv6_route( + routes: str, ips: str +) -> Dict[str, Dict[str, Set[str]]]: r = {} # type: Dict[str, Dict[str, Set[str]]] - route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$') + route_p = re.compile( + r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$' + ) ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$') iface_p = re.compile(r'^(\d+): (\S+): (.*)$') for line in routes.splitlines(): @@ -846,8 +942,11 @@ def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]: continue ip = m[0][0] # find the network it belongs to - net = [n for n in r.keys() - if ipaddress.ip_address(ip) in ipaddress.ip_network(n)] + net = [ + n + for n in r.keys() + if ipaddress.ip_address(ip) in ipaddress.ip_network(n) + ] if net and iface in r[net[0]]: assert iface r[net[0]][iface].add(ip) -- 2.39.5