if not os.path.exists('/proc/net/if_inet6'):
return ''
- raw = rfiles(['/proc/net/if_inet6'])
+ raw = read_file(['/proc/net/if_inet6'])
data = raw.splitlines()
# based on docs @ https://www.tldp.org/HOWTO/Linux+IPv6-HOWTO/ch11s04.html
+ # field 0 is ipv6, field 2 is scope
for iface_setting in data:
field = iface_setting.split()
if field[-1] == ifname:
- return "{}/{}".format(field[0], int('0x{}'.format(field[2]), 16))
+ ipv6_raw = field[0]
+ ipv6_fmtd = ":".join([ipv6_raw[_p:_p+4] for _p in range(0, len(field[0]),4)])
+ # apply naming rules using ipaddress module
+ ipv6 = ipaddress.ip_address(ipv6_fmtd)
+ return "{}/{}".format(str(ipv6), int('0x{}'.format(field[2]), 16))
return ''
def bytes_to_human(num, mode='decimal'):
# type: (float, str) -> str
+ """Convert a bytes value into it's human-readable form.
+
+ :param num: number, in bytes, to convert
+ :param mode: Either decimal (default) or binary to determine divisor
+ :returns: string representing the bytes value in a more readable format
+ """
unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
divisor = 1000.0
yotta = "YB"
return "%.1f%s" % (num, yotta)
-def rfiles(path_list, file_name=''):
+def read_file(path_list, file_name=''):
# type: (List[str], str) -> str
+ """Returns the content of the first file found within the `path_list`
+
+ :param path_list: list of file paths to search
+ :param file_name: optional file_name to be applied to a file path
+ :returns: content of the file or 'Unknown'
+ """
for path in path_list:
if file_name:
file_path = os.path.join(path, file_name)
class HostFacts():
_dmi_path_list = ['/sys/class/dmi/id']
_nic_path_list = ['/sys/class/net']
+ _selinux_path_list = ['/etc/selinux/config']
+ _apparmor_path_list = ['/etc/apparmor']
_disk_vendor_workarounds = {
"0x1af4": "Virtio Block Device"
}
self.cpu_threads = 0
self.interfaces = {}
- self._memory = rfiles(['/proc/meminfo'])
+ self._memory = read_file(['/proc/meminfo'])
self._get_cpuinfo()
self._process_nics()
self.arch = platform.processor()
self.kernel = platform.release()
def _get_cpuinfo(self):
- raw = rfiles(['/proc/cpuinfo'])
+ # type: () -> None
+ """Determine cpu information via /proc/cpuinfo"""
+ raw = read_file(['/proc/cpuinfo'])
output = raw.splitlines()
cpu_set = set()
for line in output:
- field = line.split(':')
+ field = [l.strip() for l in line.split(':')]
if "model name" in line:
self.cpu_model = field[1]
if "physical id" in line:
def _get_block_devs(self):
# type: () -> List[str]
+ """Determine the list of block devices by looking at /sys/block"""
return [dev for dev in os.listdir('/sys/block')
if not dev.startswith('dm')]
def _get_devs_by_type(self, rota='0'):
# type: (str) -> List[str]
+ """Filter block devices by a given rotational attribute (0=flash, 1=spinner)"""
devs = list()
for blk_dev in self._get_block_devs():
rot_path = '/sys/block/{}/queue/rotational'.format(blk_dev)
- rot_value = rfiles([rot_path])
+ rot_value = read_file([rot_path])
if rot_value == rota:
devs.append(blk_dev)
return devs
@property
def operating_system(self):
# type: () -> str
- raw_info = rfiles(['/etc/os-release'])
+ """Determine OS version"""
+ raw_info = read_file(['/etc/os-release'])
os_release = raw_info.splitlines()
rel_str = 'Unknown'
rel_dict = dict()
for line in os_release:
if "=" in line:
var_name, var_value = line.split('=')
- rel_dict[var_name] = var_value.replace('"', '')
+ rel_dict[var_name] = var_value.strip('"')
# Would normally use PRETTY_NAME, but NAME and VERSION are more
# consistent
rel_str = "{} {}".format(rel_dict['NAME'], rel_dict['VERSION'])
return rel_str
+ @property
+ def hostname(self):
+ # type: () -> str
+ """Return the hostname"""
+ return platform.node()
+
@property
def subscribed(self):
# type: () -> str
+ """Highlevel check to see if the host is subscribed to receive updates/support"""
def _red_hat():
# type: () -> str
# RHEL 7 and RHEL 8
@property
def hdd_count(self):
# type: () -> int
+ """Return a count of HDDs (spinners)"""
return len(self._get_devs_by_type(rota='1'))
def _get_capacity(self, dev):
# type: (str) -> int
+ """Determine the size of a given device"""
size_path = os.path.join('/sys/block', dev, 'size')
- size_blocks = int(rfiles([size_path]))
+ size_blocks = int(read_file([size_path]))
blk_path = os.path.join('/sys/block', dev, 'queue', 'logical_block_size')
- blk_count = int(rfiles([blk_path]))
+ blk_count = int(read_file([blk_path]))
return size_blocks * blk_count
def _get_capacity_by_type(self, rota='0'):
# type: (str) -> int
+ """Return the total capacity of a category of device (flash or hdd)"""
devs = self._get_devs_by_type(rota=rota)
capacity = 0
for dev in devs:
def _dev_list(self, dev_list):
# type: (List[str]) -> List[str]
+ """Return a 'pretty' name list for each device in the `dev_list`"""
disk_list = list()
for dev in dev_list:
- disk_model = rfiles(['/sys/block/{}/device/model'.format(dev)]).strip()
- vendor = rfiles(['/sys/block/{}/device/vendor'.format(dev)]).strip()
+ disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip()
+ vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip()
disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
disk_size_bytes = self._get_capacity(dev)
disk_list.append("{} {} ({})".format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)))
@property
def hdd_list(self):
# type: () -> List[str]
+ """Return a list of devices that are HDDs (spinners)"""
devs = self._get_devs_by_type(rota='1')
return self._dev_list(devs)
@property
def flash_list(self):
# type: () -> List[str]
+ """Return a list of devices that are flash based (SSD, NVMe)"""
devs = self._get_devs_by_type(rota='0')
return self._dev_list(devs)
@property
def hdd_capacity_bytes(self):
# type: () -> int
+ """Return the total capacity for all HDD devices (bytes)"""
return self._get_capacity_by_type(rota='1')
@property
def hdd_capacity(self):
# type: () -> str
+ """Return the total capacity for all HDD devices (human readable format)"""
return bytes_to_human(self.hdd_capacity_bytes)
@property
def flash_count(self):
# type: () -> int
+ """Return the number of flash devices in the system (SSD, NVMe)"""
return len(self._get_devs_by_type(rota='0'))
@property
def flash_capacity_bytes(self):
# type: () -> int
+ """Return the total capacity for all flash devices (bytes)"""
return self._get_capacity_by_type(rota='0')
@property
def flash_capacity(self):
# type: () -> str
+ """Return the total capacity for all Flash devices (human readable format)"""
return bytes_to_human(self.flash_capacity_bytes)
def _process_nics(self):
# type: () -> None
+ """Look at the NIC devices and extract network related metadata"""
+ # from https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_arp.h
+ hw_lookup = {
+ "1": "ethernet",
+ "32": "infiniband",
+ "772": "loopback",
+ }
+
for nic_path in HostFacts._nic_path_list:
if not os.path.exists(nic_path):
continue
for iface in os.listdir(nic_path):
+
+ lower_devs_list = [os.path.basename(link.replace("lower_", "")) for link in glob(os.path.join(nic_path, iface, "lower_*"))]
+ upper_devs_list = [os.path.basename(link.replace("upper_", "")) for link in glob(os.path.join(nic_path, iface, "upper_*"))]
- mtu = rfiles([os.path.join(nic_path, iface, 'mtu')])
- operstate = rfiles([os.path.join(nic_path, iface, 'operstate')])
try:
- speed = rfiles([os.path.join(nic_path, iface, 'speed')])
- except OSError:
- # OSError is raised when the device doesn't support
- # the ethtool get_link_ksettings, so we'll use -1
- # to indicate unknown (-1 is also shown for NICs
- # that are not link-up)
- speed = "-1"
+ mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')]))
+ except ValueError:
+ mtu = 0
+
+ operstate = read_file([os.path.join(nic_path, iface, 'operstate')])
+ try:
+ speed = int(read_file([os.path.join(nic_path, iface, 'speed')]))
+ except (OSError, ValueError):
+ # OSError : device doesn't support the ethtool get_link_ksettings
+ # ValueError : raised when the read fails, and returns Unknown
+ #
+ # Either way, we show a -1 when speed isn't available
+ speed = -1
+
+ if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
+ nic_type = "bridge"
+ elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
+ nic_type = "bonding"
+ else:
+ nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), "Unknown")
+
+ dev_link = os.path.join(nic_path, iface, 'device')
+ if os.path.exists(dev_link):
+ iftype = 'physical'
+ driver_path = os.path.join(dev_link, 'driver')
+ if os.path.exists(driver_path):
+ driver = os.path.basename(
+ os.path.realpath(driver_path))
+ else:
+ driver = 'Unknown'
- iftype = 'physical' if os.path.exists(os.path.join(nic_path, iface, 'device')) else 'logical'
+ else:
+ iftype = 'logical'
+ driver = ''
+
+
self.interfaces[iface] = {
"mtu": mtu,
+ "upper_devs_list": upper_devs_list,
+ "lower_devs_list": lower_devs_list,
"operstate": operstate,
"iftype": iftype,
+ "nic_type": nic_type,
+ "driver": driver,
"speed": speed,
"ipv4_address": get_ipv4_address(iface),
"ipv6_address": get_ipv6_address(iface),
@property
def nic_count(self):
# type: () -> int
+ """Return a total count of all physical NICs detected in the host"""
phys_devs = []
for iface in self.interfaces:
if self.interfaces[iface]["iftype"] == 'physical':
@property
def memory_kb(self):
# type: () -> int
+ """Determine the memory installed"""
out = self._memory.splitlines()
for line in out:
if line.startswith('MemTotal'):
@property
def vendor(self):
# type: () -> str
- return rfiles(HostFacts._dmi_path_list, "sys_vendor")
+ """Determine server vendor from DMI data in sysfs"""
+ return read_file(HostFacts._dmi_path_list, "sys_vendor")
@property
def model(self):
# type: () -> str
- family = rfiles(HostFacts._dmi_path_list, "product_family")
- product = rfiles(HostFacts._dmi_path_list, "product_name")
+ """Determine server model information from DMI data in sysfs"""
+ family = read_file(HostFacts._dmi_path_list, "product_family")
+ product = read_file(HostFacts._dmi_path_list, "product_name")
if family == 'Unknown' and product:
return "{}".format(product)
@property
def bios_version(self):
# type: () -> str
- return rfiles(HostFacts._dmi_path_list, "bios_version")
+ """Determine server BIOS version from DMI data in sysfs"""
+ return read_file(HostFacts._dmi_path_list, "bios_version")
@property
def bios_date(self):
# type: () -> str
- return rfiles(HostFacts._dmi_path_list, "bios_date")
+ """Determine server BIOS date from DMI data in sysfs"""
+ return read_file(HostFacts._dmi_path_list, "bios_date")
- def dump(self, format_type='json'):
- # type: (str) -> str
- return self._to_json()
+ @property
+ def timestamp(self):
+ # type: () -> float
+ """Return the current time as Epoch seconds"""
+ return time.time()
+
+ @property
+ def system_uptime(self):
+ # type: () -> float
+ """Return the system uptime (in secs)"""
+ raw_time = read_file(['/proc/uptime'])
+ up_secs, _ = raw_time.split()
+ return float(up_secs)
+
+ @property
+ def kernel_security(self):
+ # type: () -> Dict[str -> str]
+ """Determine the security features enabled in the kernel - SELinux, AppArmor"""
+ def _fetch_selinux():
+ """Read the selinux config file to determine state"""
+ security = {}
+ for selinux_path in HostFacts._selinux_path_list:
+ if os.path.exists(selinux_path):
+ selinux_config = read_file([selinux_path]).splitlines()
+ security['type'] = 'SELinux'
+ for line in selinux_config:
+ if line.strip().startswith('#'):
+ continue
+ k, v = line.split('=')
+ security[k] = v
+ if security['SELINUX'].lower() == "disabled":
+ security['description'] = "SELinux: Disabled"
+ else:
+ security['description'] = "SELinux: Enabled({}, {})".format(security['SELINUX'], security['SELINUXTYPE'])
+ return security
+
+ def _fetch_apparmor():
+ """Read the apparmor profiles directly, returning an overview of AppArmor status"""
+ security = {}
+ for apparmor_path in HostFacts._apparmor_path_list:
+ if os.path.exists(apparmor_path):
+ security['type'] = "AppArmor"
+ security['description'] = "AppArmor: Enabled"
+ try:
+ profiles = read_file(['/sys/kernel/security/apparmor/profiles'])
+ except OSError:
+ pass
+ else:
+ summary = {}
+ for line in profiles.split('\n'):
+ item, mode = line.split(' ')
+ mode= mode.strip('()')
+ if mode in summary:
+ summary[mode] += 1
+ else:
+ summary[mode] = 0
+ summary_str = ",".join(["{} {}".format(v, k) for k, v in summary.items()])
+ security = {**security, **summary}
+ security['description'] += "({})".format(summary_str)
+
+ return security
+
+ if os.path.exists('/sys/kernel/security/lsm'):
+ lsm = read_file(['/sys/kernel/security/lsm']).strip()
+ if 'selinux' in lsm:
+ return _fetch_selinux()
+ elif 'apparmor' in lsm:
+ return _fetch_apparmor()
+ else:
+ return {
+ "type": "Unknown",
+ "description": "Linux Security Module framework is active, but is not using SELinux or AppArmor"
+ }
- def _to_json(self):
+ return {
+ "type": "None",
+ "description": "Linux Security Module framework is not available"
+ }
+
+ def dump(self):
# type: () -> str
+ """Return the attributes of this HostFacts object as json"""
data = {k: getattr(self, k) for k in dir(self)
if not k.startswith('_') and
isinstance(getattr(self, k),
}
return json.dumps(data, indent=2, sort_keys=True)
-
##################################
def command_gather_facts():
+ """gather_facts is intended to provide host releated metadata to the caller"""
host = HostFacts()
- print(host.dump(args.fact_format))
+ print(host.dump())
##################################
help='cluster FSID')
parser_gather_facts = subparsers.add_parser(
- 'gather-facts', help='gather and return host related information')
+ 'gather-facts', help='gather and return host related information (JSON format)')
parser_gather_facts.set_defaults(func=command_gather_facts)
- parser_gather_facts.add_argument(
- '--fact-format',
- choices=['json'],
- default='json',
- help='format for the output')
return parser