import tempfile
import time
import errno
+import struct
try:
from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
except ImportError:
##################################
+def get_ipv4_address(ifname):
+ # type: (str) -> str
+ def _extract(sock, offset):
+ return socket.inet_ntop(
+ socket.AF_INET,
+ fcntl.ioctl(
+ sock.fileno(),
+ offset,
+ struct.pack('256s', bytes(ifname[:15], 'utf-8'))
+ )[20:24])
+
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ addr = _extract(s, 35093) # '0x8915' = SIOCGIFADDR
+ dq_mask = _extract(s, 35099) # 0x891b = SIOCGIFNETMASK
+ except OSError:
+ # interface does not have an ipv4 address
+ return ''
+
+ dec_mask = sum([bin(int(i)).count('1')
+ for i in dq_mask.split('.')])
+ return '{}/{}'.format(addr, dec_mask)
+
+
+def get_ipv6_address(ifname):
+ # type: (str) -> str
+ if not os.path.exists('/proc/net/if_inet6'):
+ return ''
+
+ raw = rfiles(['/proc/net/if_inet6'])
+ data = raw.splitlines()
+ # based on docs @ https://www.tldp.org/HOWTO/Linux+IPv6-HOWTO/ch11s04.html
+ for iface_setting in data:
+ field = iface_setting.split()
+ if field[-1] == ifname:
+ return "{}/{}".format(field[0], int('0x{}'.format(field[2]), 16))
+ return ''
+
+
+def bytes_to_human(num, mode='decimal'):
+ # type: (float, str) -> str
+ unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
+ divisor = 1000.0
+ yotta = "YB"
+
+ if mode == 'binary':
+ unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
+ divisor = 1024.0
+ yotta = "YiB"
+
+ for unit in unit_list:
+ if abs(num) < divisor:
+ return "%3.1f%s" % (num, unit)
+ num /= divisor
+ return "%.1f%s" % (num, yotta)
+
+
+def rfiles(path_list, file_name=''):
+ # type: (List[str], str) -> str
+ for path in path_list:
+ if file_name:
+ file_path = os.path.join(path, file_name)
+ else:
+ file_path = path
+ if os.path.exists(file_path):
+ with open(file_path, 'r') as f:
+ content = f.read().strip()
+ return content
+ return "Unknown"
+
+
+##################################
+class HostFacts():
+ _dmi_path_list = ['/sys/class/dmi/id']
+ _nic_path_list = ['/sys/class/net']
+ _disk_vendor_workarounds = {
+ "0x1af4": "Virtio Block Device"
+ }
+
+ def __init__(self):
+ self.cpu_model = 'Unknown'
+ self.cpu_count = 0
+ self.cpu_cores = 0
+ self.cpu_threads = 0
+ self.interfaces = {}
+
+ self._memory = rfiles(['/proc/meminfo'])
+ self._get_cpuinfo()
+ self._process_nics()
+ self.arch = platform.processor()
+ self.kernel = platform.release()
+
+ def _get_cpuinfo(self):
+ raw = rfiles(['/proc/cpuinfo'])
+ output = raw.splitlines()
+ cpu_set = set()
+
+ for line in output:
+ field = line.split(':')
+ if "model name" in line:
+ self.cpu_model = field[1]
+ if "physical id" in line:
+ cpu_set.add(field[1])
+ if "siblings" in line:
+ self.cpu_threads = int(field[1].strip())
+ if "cpu cores" in line:
+ self.cpu_cores = int(field[1].strip())
+ pass
+ self.cpu_count = len(cpu_set)
+
+ def _get_block_devs(self):
+ # type: () -> List[str]
+ return [dev for dev in os.listdir('/sys/block')
+ if not dev.startswith('dm')]
+
+ def _get_devs_by_type(self, rota='0'):
+ # type: (str) -> List[str]
+ devs = list()
+ for blk_dev in self._get_block_devs():
+ rot_path = '/sys/block/{}/queue/rotational'.format(blk_dev)
+ rot_value = rfiles([rot_path])
+ if rot_value == rota:
+ devs.append(blk_dev)
+ return devs
+
+ @property
+ def operating_system(self):
+ # type: () -> str
+ raw_info = rfiles(['/etc/os-release'])
+ os_release = raw_info.splitlines()
+ rel_str = 'Unknown'
+ rel_dict = dict()
+
+ for line in os_release:
+ if "=" in line:
+ var_name, var_value = line.split('=')
+ rel_dict[var_name] = var_value.replace('"', '')
+
+ # Would normally use PRETTY_NAME, but NAME and VERSION are more
+ # consistent
+ if all(_v in rel_dict for _v in ["NAME", "VERSION"]):
+ rel_str = "{} {}".format(rel_dict['NAME'], rel_dict['VERSION'])
+ return rel_str
+
+ @property
+ def subscribed(self):
+ # type: () -> str
+ def _red_hat():
+ # type: () -> str
+ # RHEL 7 and RHEL 8
+ entitlements_dir = '/etc/pki/entitlement'
+ if os.path.exists(entitlements_dir):
+ pems = glob('{}/*.pem'.format(entitlements_dir))
+ if len(pems) >= 2:
+ return "Yes"
+
+ return "No"
+
+ os_name = self.operating_system
+ if os_name.upper().startswith("RED HAT"):
+ return _red_hat()
+
+ return "Unknown"
+
+ @property
+ def hdd_count(self):
+ # type: () -> int
+ return len(self._get_devs_by_type(rota='1'))
+
+ def _get_capacity(self, dev):
+ # type: (str) -> int
+ size_path = os.path.join('/sys/block', dev, 'size')
+ size_blocks = int(rfiles([size_path]))
+ blk_path = os.path.join('/sys/block', dev, 'queue', 'logical_block_size')
+ blk_count = int(rfiles([blk_path]))
+ return size_blocks * blk_count
+
+ def _get_capacity_by_type(self, rota='0'):
+ # type: (str) -> int
+ devs = self._get_devs_by_type(rota=rota)
+ capacity = 0
+ for dev in devs:
+ capacity += self._get_capacity(dev)
+ return capacity
+
+ def _dev_list(self, dev_list):
+ # type: (List[str]) -> List[str]
+ disk_list = list()
+
+ for dev in dev_list:
+ disk_model = rfiles(['/sys/block/{}/device/model'.format(dev)]).strip()
+ vendor = rfiles(['/sys/block/{}/device/vendor'.format(dev)]).strip()
+ disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
+ disk_size_bytes = self._get_capacity(dev)
+ disk_list.append("{} {} ({})".format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)))
+ return disk_list
+
+ @property
+ def hdd_list(self):
+ # type: () -> List[str]
+ devs = self._get_devs_by_type(rota='1')
+ return self._dev_list(devs)
+
+ @property
+ def flash_list(self):
+ # type: () -> List[str]
+ devs = self._get_devs_by_type(rota='0')
+ return self._dev_list(devs)
+
+ @property
+ def hdd_capacity_bytes(self):
+ # type: () -> int
+ return self._get_capacity_by_type(rota='1')
+
+ @property
+ def hdd_capacity(self):
+ # type: () -> str
+ return bytes_to_human(self.hdd_capacity_bytes)
+
+ @property
+ def flash_count(self):
+ # type: () -> int
+ return len(self._get_devs_by_type(rota='0'))
+
+ @property
+ def flash_capacity_bytes(self):
+ # type: () -> int
+ return self._get_capacity_by_type(rota='0')
+
+ @property
+ def flash_capacity(self):
+ # type: () -> str
+ return bytes_to_human(self.flash_capacity_bytes)
+
+ def _process_nics(self):
+ # type: () -> None
+ for nic_path in HostFacts._nic_path_list:
+ if not os.path.exists(nic_path):
+ continue
+ for iface in os.listdir(nic_path):
+
+ mtu = rfiles([os.path.join(nic_path, iface, 'mtu')])
+ operstate = rfiles([os.path.join(nic_path, iface, 'operstate')])
+ try:
+ speed = rfiles([os.path.join(nic_path, iface, 'speed')])
+ except OSError:
+ # OSError is raised when the device doesn't support
+ # the ethtool get_link_ksettings, so we'll use -1
+ # to indicate unknown (-1 is also shown for NICs
+ # that are not link-up)
+ speed = "-1"
+
+ iftype = 'physical' if os.path.exists(os.path.join(nic_path, iface, 'device')) else 'logical'
+
+ self.interfaces[iface] = {
+ "mtu": mtu,
+ "operstate": operstate,
+ "iftype": iftype,
+ "speed": speed,
+ "ipv4_address": get_ipv4_address(iface),
+ "ipv6_address": get_ipv6_address(iface),
+ }
+
+ @property
+ def nic_count(self):
+ # type: () -> int
+ phys_devs = []
+ for iface in self.interfaces:
+ if self.interfaces[iface]["iftype"] == 'physical':
+ phys_devs.append(iface)
+ return len(phys_devs)
+
+ @property
+ def memory_kb(self):
+ # type: () -> int
+ out = self._memory.splitlines()
+ for line in out:
+ if line.startswith('MemTotal'):
+ _d = line.split()
+ return int(_d[1])
+ return 0
+
+ @property
+ def vendor(self):
+ # type: () -> str
+ return rfiles(HostFacts._dmi_path_list, "sys_vendor")
+
+ @property
+ def model(self):
+ # type: () -> str
+ family = rfiles(HostFacts._dmi_path_list, "product_family")
+ product = rfiles(HostFacts._dmi_path_list, "product_name")
+ if family == 'Unknown' and product:
+ return "{}".format(product)
+
+ return "{} ({})".format(family, product)
+
+ @property
+ def bios_version(self):
+ # type: () -> str
+ return rfiles(HostFacts._dmi_path_list, "bios_version")
+
+ @property
+ def bios_date(self):
+ # type: () -> str
+ return rfiles(HostFacts._dmi_path_list, "bios_date")
+
+ def dump(self, format_type='json'):
+ # type: (str) -> str
+ return self._to_json()
+
+ def _to_json(self):
+ # type: () -> str
+ data = {k: getattr(self, k) for k in dir(self)
+ if not k.startswith('_') and
+ isinstance(getattr(self, k),
+ (float, int, str, list, dict, tuple))
+ }
+ return json.dumps(data, indent=2, sort_keys=True)
+
+
+##################################
+
+def command_gather_facts():
+ host = HostFacts()
+ print(host.dump(args.fact_format))
+
+
+##################################
+
def _get_parser():
# type: () -> argparse.ArgumentParser
'--fsid',
help='cluster FSID')
+ parser_gather_facts = subparsers.add_parser(
+ 'gather-facts', help='gather and return host related information')
+ parser_gather_facts.set_defaults(func=command_gather_facts)
+ parser_gather_facts.add_argument(
+ '--fact-format',
+ choices=['json'],
+ default='json',
+ help='format for the output')
+
return parser