]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cephadm: format black host_facts.py
authorAdam King <adking@redhat.com>
Thu, 21 Sep 2023 20:28:09 +0000 (16:28 -0400)
committerAdam King <adking@redhat.com>
Mon, 25 Sep 2023 14:26:53 +0000 (10:26 -0400)
Signed-off-by: Adam King <adking@redhat.com>
src/cephadm/cephadmlib/host_facts.py

index d8b072227fbd11ddfc21a1f71be761b7f7376643..1cfb2ac84d92632da4b04c8529678d7c5d4e5942 100644 (file)
@@ -55,15 +55,25 @@ class Enclosure:
 
         self.vendor = read_file([os.path.join(self._dev_path, 'vendor')])
         self.model = read_file([os.path.join(self._dev_path, 'model')])
-        self.components = read_file([os.path.join(self._enc_path, 'components')])
+        self.components = read_file(
+            [os.path.join(self._enc_path, 'components')]
+        )
         slot_paths = glob(os.path.join(self._enc_path, '*', 'slot'))
         for slot_path in slot_paths:
             slot = read_file([slot_path])
-            serial_path = os.path.join(os.path.dirname(slot_path), 'device', 'vpd_pg80')
+            serial_path = os.path.join(
+                os.path.dirname(slot_path), 'device', 'vpd_pg80'
+            )
             serial = ''
             if os.path.exists(serial_path):
                 serial_raw = read_file([serial_path])
-                serial = (''.join(char for char in serial_raw if char in string.printable)).strip()
+                serial = (
+                    ''.join(
+                        char
+                        for char in serial_raw
+                        if char in string.printable
+                    )
+                ).strip()
                 self.device_lookup[serial] = slot
             slot_dir = os.path.dirname(slot_path)
             self.slot_map[slot] = {
@@ -86,7 +96,9 @@ class Enclosure:
 
     def _dump(self) -> Dict[str, Any]:
         """Return a dict representation of the object"""
-        return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
+        return {
+            k: v for k, v in self.__dict__.items() if not k.startswith('_')
+        }
 
     def __str__(self) -> str:
         """Return a formatted json representation of the object as a string"""
@@ -101,13 +113,11 @@ class Enclosure:
         return self._dump()
 
 
-class HostFacts():
+class HostFacts:
     _dmi_path_list = ['/sys/class/dmi/id']
     _nic_path_list = ['/sys/class/net']
     _apparmor_path_list = ['/etc/apparmor']
-    _disk_vendor_workarounds = {
-        '0x1af4': 'Virtio Block Device'
-    }
+    _disk_vendor_workarounds = {'0x1af4': 'Virtio Block Device'}
     _excluded_block_devices = ('sr', 'zram', 'dm-', 'loop', 'md')
     _sg_generic_glob = '/sys/class/scsi_generic/*'
 
@@ -131,7 +141,11 @@ class HostFacts():
 
     def _populate_sysctl_options(self) -> Dict[str, str]:
         sysctl_options = {}
-        out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+        out, _, _ = call_throws(
+            self.ctx,
+            ['sysctl', '-a'],
+            verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+        )
         if out:
             for line in out.splitlines():
                 option, value = line.split('=')
@@ -204,8 +218,11 @@ class HostFacts():
     def _get_block_devs(self):
         # type: () -> List[str]
         """Determine the list of block devices by looking at /sys/block"""
-        return [dev for dev in os.listdir('/sys/block')
-                if not dev.startswith(HostFacts._excluded_block_devices)]
+        return [
+            dev
+            for dev in os.listdir('/sys/block')
+            if not dev.startswith(HostFacts._excluded_block_devices)
+        ]
 
     @property
     def operating_system(self):
@@ -245,6 +262,7 @@ class HostFacts():
     def subscribed(self):
         # type: () -> str
         """Highlevel check to see if the host is subscribed to receive updates/support"""
+
         def _red_hat():
             # type: () -> str
             # RHEL 7 and RHEL 8
@@ -320,11 +338,21 @@ class HostFacts():
             scsi_addr = ''
             mpath = ''
 
-            disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip()
-            disk_rev = read_file(['/sys/block/{}/device/rev'.format(dev)]).strip()
-            disk_wwid = read_file(['/sys/block/{}/device/wwid'.format(dev)]).strip()
-            vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip()
-            rotational = read_file(['/sys/block/{}/queue/rotational'.format(dev)])
+            disk_model = read_file(
+                ['/sys/block/{}/device/model'.format(dev)]
+            ).strip()
+            disk_rev = read_file(
+                ['/sys/block/{}/device/rev'.format(dev)]
+            ).strip()
+            disk_wwid = read_file(
+                ['/sys/block/{}/device/wwid'.format(dev)]
+            ).strip()
+            vendor = read_file(
+                ['/sys/block/{}/device/vendor'.format(dev)]
+            ).strip()
+            rotational = read_file(
+                ['/sys/block/{}/queue/rotational'.format(dev)]
+            )
             holders_raw = glob('/sys/block/{}/holders/*'.format(dev))
             if len(holders_raw) == 1:
                 # mpath will have 1 holder entry
@@ -337,8 +365,12 @@ class HostFacts():
                 scsi_addr = os.path.basename(scsi_addr_path[0])
 
             # vpd_pg80 isn't guaranteed (libvirt, vmware for example)
-            serial_raw = read_file(['/sys/block/{}/device/vpd_pg80'.format(dev)])
-            serial = (''.join(i for i in serial_raw if i in string.printable)).strip()
+            serial_raw = read_file(
+                ['/sys/block/{}/device/vpd_pg80'.format(dev)]
+            )
+            serial = (
+                ''.join(i for i in serial_raw if i in string.printable)
+            ).strip()
             if serial.lower() == 'unknown':
                 serial = ''
             else:
@@ -351,25 +383,33 @@ class HostFacts():
                         enclosure_id = enc_id
                         enclosure_slot = enclosure.device_lookup[serial]
 
-            disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
+            disk_vendor = HostFacts._disk_vendor_workarounds.get(
+                vendor, vendor
+            )
             disk_size_bytes = self._get_capacity(dev)
-            disk_list.append({
-                'description': '{} {} ({})'.format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)),
-                'vendor': disk_vendor,
-                'model': disk_model,
-                'rev': disk_rev,
-                'wwid': disk_wwid,
-                'dev_name': dev,
-                'disk_size_bytes': disk_size_bytes,
-                'disk_type': disk_type,
-                'serial': serial,
-                'alt_dev_name': '',
-                'scsi_addr': scsi_addr,
-                'enclosure_id': enclosure_id,
-                'enclosure_slot': enclosure_slot,
-                'path_id': disk_path_map.get(dev, ''),
-                'mpath': mpath,
-            })
+            disk_list.append(
+                {
+                    'description': '{} {} ({})'.format(
+                        disk_vendor,
+                        disk_model,
+                        bytes_to_human(disk_size_bytes),
+                    ),
+                    'vendor': disk_vendor,
+                    'model': disk_model,
+                    'rev': disk_rev,
+                    'wwid': disk_wwid,
+                    'dev_name': dev,
+                    'disk_size_bytes': disk_size_bytes,
+                    'disk_type': disk_type,
+                    'serial': serial,
+                    'alt_dev_name': '',
+                    'scsi_addr': scsi_addr,
+                    'enclosure_id': enclosure_id,
+                    'enclosure_slot': enclosure_slot,
+                    'path_id': disk_path_map.get(dev, ''),
+                    'mpath': mpath,
+                }
+            )
 
         # process the devices to drop duplicate physical devs based on matching
         # the unique serial number
@@ -399,7 +439,9 @@ class HostFacts():
     def flash_list(self):
         # type: () -> List[Dict[str, object]]
         """Return a list of devices that are flash based (SSD, NVMe)"""
-        return [dev for dev in self._device_list if dev['disk_type'] == 'flash']
+        return [
+            dev for dev in self._device_list if dev['disk_type'] == 'flash'
+        ]
 
     @property
     def hdd_capacity_bytes(self):
@@ -457,28 +499,42 @@ class HostFacts():
             if not os.path.exists(nic_path):
                 continue
             for iface in os.listdir(nic_path):
-
                 if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
                     nic_type = 'bridge'
                 elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
                     nic_type = 'bonding'
                 else:
-                    nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
+                    nic_type = hw_lookup.get(
+                        read_file([os.path.join(nic_path, iface, 'type')]),
+                        'Unknown',
+                    )
 
                 if nic_type == 'loopback':  # skip loopback devices
                     continue
 
-                lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))]
-                upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))]
+                lower_devs_list = [
+                    os.path.basename(link.replace('lower_', ''))
+                    for link in glob(os.path.join(nic_path, iface, 'lower_*'))
+                ]
+                upper_devs_list = [
+                    os.path.basename(link.replace('upper_', ''))
+                    for link in glob(os.path.join(nic_path, iface, 'upper_*'))
+                ]
 
                 try:
-                    mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')]))
+                    mtu = int(
+                        read_file([os.path.join(nic_path, iface, 'mtu')])
+                    )
                 except ValueError:
                     mtu = 0
 
-                operstate = read_file([os.path.join(nic_path, iface, 'operstate')])
+                operstate = read_file(
+                    [os.path.join(nic_path, iface, 'operstate')]
+                )
                 try:
-                    speed = int(read_file([os.path.join(nic_path, iface, 'speed')]))
+                    speed = int(
+                        read_file([os.path.join(nic_path, iface, 'speed')])
+                    )
                 except (OSError, ValueError):
                     # OSError : device doesn't support the ethtool get_link_ksettings
                     # ValueError : raised when the read fails, and returns Unknown
@@ -491,7 +547,9 @@ class HostFacts():
                     iftype = 'physical'
                     driver_path = os.path.join(dev_link, 'driver')
                     if os.path.exists(driver_path):
-                        driver = os.path.basename(os.path.realpath(driver_path))
+                        driver = os.path.basename(
+                            os.path.realpath(driver_path)
+                        )
                     else:
                         driver = 'Unknown'
 
@@ -613,12 +671,14 @@ class HostFacts():
     def kernel_security(self):
         # type: () -> Dict[str, str]
         """Determine the security features enabled in the kernel - SELinux, AppArmor"""
+
         def _fetch_selinux() -> Dict[str, str]:
             """Get the selinux status"""
             security = {}
             try:
-                out, err, code = call(self.ctx, ['sestatus'],
-                                      verbosity=CallVerbosity.QUIET)
+                out, err, code = call(
+                    self.ctx, ['sestatus'], verbosity=CallVerbosity.QUIET
+                )
                 security['type'] = 'SELinux'
                 status, mode, policy = '', '', ''
                 for line in out.split('\n'):
@@ -634,7 +694,9 @@ class HostFacts():
                 if status == 'disabled':
                     security['description'] = 'SELinux: Disabled'
                 else:
-                    security['description'] = 'SELinux: Enabled({}, {})'.format(mode, policy)
+                    security[
+                        'description'
+                    ] = 'SELinux: Enabled({}, {})'.format(mode, policy)
             except Exception as e:
                 logger.info('unable to get selinux status: %s' % e)
             return security
@@ -647,7 +709,9 @@ class HostFacts():
                     security['type'] = 'AppArmor'
                     security['description'] = 'AppArmor: Enabled'
                     try:
-                        profiles = read_file(['/sys/kernel/security/apparmor/profiles'])
+                        profiles = read_file(
+                            ['/sys/kernel/security/apparmor/profiles']
+                        )
                         if len(profiles) == 0:
                             return {}
                     except OSError:
@@ -661,7 +725,9 @@ class HostFacts():
                                 summary[mode] += 1
                             else:
                                 summary[mode] = 0
-                        summary_str = ','.join(['{} {}'.format(v, k) for k, v in summary.items()])
+                        summary_str = ','.join(
+                            ['{} {}'.format(v, k) for k, v in summary.items()]
+                        )
                         security = {**security, **summary}  # type: ignore
                         security['description'] += '({})'.format(summary_str)
 
@@ -678,7 +744,7 @@ class HostFacts():
             else:
                 return {
                     'type': 'Unknown',
-                    'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor'
+                    'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor',
                 }
 
         if ret:
@@ -686,13 +752,14 @@ class HostFacts():
 
         return {
             'type': 'None',
-            'description': 'Linux Security Module framework is not available'
+            'description': 'Linux Security Module framework is not available',
         }
 
     @property
     def selinux_enabled(self) -> bool:
-        return (self.kernel_security['type'] == 'SELinux') and \
-               (self.kernel_security['description'] != 'SELinux: Disabled')
+        return (self.kernel_security['type'] == 'SELinux') and (
+            self.kernel_security['description'] != 'SELinux: Disabled'
+        )
 
     @property
     def kernel_parameters(self):
@@ -700,14 +767,21 @@ class HostFacts():
         """Get kernel parameters required/used in Ceph clusters"""
 
         k_param = {}
-        out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
+        out, _, _ = call_throws(
+            self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT
+        )
         if out:
             param_list = out.split('\n')
-            param_dict = {param.split(' = ')[0]: param.split(' = ')[-1] for param in param_list}
+            param_dict = {
+                param.split(' = ')[0]: param.split(' = ')[-1]
+                for param in param_list
+            }
 
             # return only desired parameters
             if 'net.ipv4.ip_nonlocal_bind' in param_dict:
-                k_param['net.ipv4.ip_nonlocal_bind'] = param_dict['net.ipv4.ip_nonlocal_bind']
+                k_param['net.ipv4.ip_nonlocal_bind'] = param_dict[
+                    'net.ipv4.ip_nonlocal_bind'
+                ]
 
         return k_param
 
@@ -717,10 +791,7 @@ class HostFacts():
         # Connections state documentation
         # tcp - https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h
         # udp - uses 07 (TCP_CLOSE or UNCONN, since udp is stateless. test with netcat -ul <port>)
-        listening_state = {
-            'tcp': '0A',
-            'udp': '07'
-        }
+        listening_state = {'tcp': '0A', 'udp': '07'}
 
         if protocol not in listening_state.keys():
             return []
@@ -757,9 +828,12 @@ class HostFacts():
         # type: () -> str
         """Return the attributes of this HostFacts object as json"""
         data = {
-            k: getattr(self, k) for k in dir(self)
+            k: getattr(self, k)
+            for k in dir(self)
             if not k.startswith('_')
-            and isinstance(getattr(self, k), (float, int, str, list, dict, tuple))
+            and isinstance(
+                getattr(self, k), (float, int, str, list, dict, tuple)
+            )
         }
         return json.dumps(data, indent=2, sort_keys=True)
 
@@ -778,17 +852,25 @@ def list_networks(ctx):
     return res
 
 
-def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
+def _list_ipv4_networks(
+    ctx: CephadmContext,
+) -> Dict[str, Dict[str, Set[str]]]:
     execstr: Optional[str] = find_executable('ip')
     if not execstr:
         raise FileNotFoundError("unable to find 'ip' command")
-    out, _, _ = call_throws(ctx, [execstr, 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+    out, _, _ = call_throws(
+        ctx,
+        [execstr, 'route', 'ls'],
+        verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+    )
     return _parse_ipv4_route(out)
 
 
 def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]:
     r = {}  # type: Dict[str, Dict[str, Set[str]]]
-    p = re.compile(r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)')
+    p = re.compile(
+        r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)'
+    )
     for line in out.splitlines():
         m = p.findall(line)
         if not m:
@@ -806,18 +888,32 @@ def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]:
     return r
 
 
-def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
+def _list_ipv6_networks(
+    ctx: CephadmContext,
+) -> Dict[str, Dict[str, Set[str]]]:
     execstr: Optional[str] = find_executable('ip')
     if not execstr:
         raise FileNotFoundError("unable to find 'ip' command")
-    routes, _, _ = call_throws(ctx, [execstr, '-6', 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
-    ips, _, _ = call_throws(ctx, [execstr, '-6', 'addr', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+    routes, _, _ = call_throws(
+        ctx,
+        [execstr, '-6', 'route', 'ls'],
+        verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+    )
+    ips, _, _ = call_throws(
+        ctx,
+        [execstr, '-6', 'addr', 'ls'],
+        verbosity=CallVerbosity.QUIET_UNLESS_ERROR,
+    )
     return _parse_ipv6_route(routes, ips)
 
 
-def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]:
+def _parse_ipv6_route(
+    routes: str, ips: str
+) -> Dict[str, Dict[str, Set[str]]]:
     r = {}  # type: Dict[str, Dict[str, Set[str]]]
-    route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
+    route_p = re.compile(
+        r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$'
+    )
     ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
     iface_p = re.compile(r'^(\d+): (\S+): (.*)$')
     for line in routes.splitlines():
@@ -846,8 +942,11 @@ def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]:
             continue
         ip = m[0][0]
         # find the network it belongs to
-        net = [n for n in r.keys()
-               if ipaddress.ip_address(ip) in ipaddress.ip_network(n)]
+        net = [
+            n
+            for n in r.keys()
+            if ipaddress.ip_address(ip) in ipaddress.ip_network(n)
+        ]
         if net and iface in r[net[0]]:
             assert iface
             r[net[0]][iface].add(ip)