else:
file_path = path
if os.path.exists(file_path):
- with open(file_path, 'r') as f:
+ with open(file_path, 'rb') as f:
try:
- content = f.read().strip()
+ content = f.read().decode('utf-8', 'ignore').strip()
except OSError:
# sysfs may populate the file, but for devices like
# virtio reads can fail
##################################
+class Enclosure:
+ def __init__(self, enc_id: str, enc_path: str, dev_path: str):
+ """External disk enclosure metadata
+
+ Args:
+ :param enc_id: enclosure id (normally a WWN)
+ :param enc_path: sysfs path to HBA attached to the enclosure
+ e.g. /sys/class/scsi_generic/sg11/device/enclosure/0:0:9:0
+ :param dev_path: sysfs path to the generic scsi device for the enclosure HBA
+ e.g. /sys/class/scsi_generic/sg2
+ """
+ self._path: str = dev_path
+ self._dev_path: str = os.path.join(dev_path, 'device')
+ self._enc_path: str = enc_path
+ self.ses_paths: List[str] = []
+ self.path_count: int = 0
+ self.vendor: str = ''
+ self.model: str = ''
+ self.enc_id: str = enc_id
+ self.components: Union[int, str] = 0
+ self.device_lookup: Dict[str, str] = {}
+ self.device_count: int = 0
+ self.slot_map: Dict[str, Dict[str, str]] = {}
+
+ self._probe()
+
+ def _probe(self) -> None:
+ """Analyse the dev paths to identify enclosure related information"""
+
+ self.vendor = read_file([os.path.join(self._dev_path, 'vendor')])
+ self.model = read_file([os.path.join(self._dev_path, 'model')])
+ self.components = read_file([os.path.join(self._enc_path, 'components')])
+ slot_paths = glob(os.path.join(self._enc_path, '*', 'slot'))
+ for slot_path in slot_paths:
+ slot = read_file([slot_path])
+ serial_path = os.path.join(os.path.dirname(slot_path), 'device', 'vpd_pg80')
+ serial = ''
+ if os.path.exists(serial_path):
+ serial_raw = read_file([serial_path])
+ serial = (''.join(char for char in serial_raw if char in string.printable)).strip()
+ self.device_lookup[serial] = slot
+ slot_dir = os.path.dirname(slot_path)
+ self.slot_map[slot] = {
+ 'status': read_file([os.path.join(slot_dir, 'status')]),
+ 'fault': read_file([os.path.join(slot_dir, 'fault')]),
+ 'locate': read_file([os.path.join(slot_dir, 'locate')]),
+ 'serial': serial,
+ }
+
+ self.device_count = len(self.device_lookup)
+ self.update(os.path.basename(self._path))
+
+ def update(self, dev_id: str) -> None:
+ """Update an enclosure object with a related sg device name
+
+ :param dev_id (str): device name e.g. sg2
+ """
+ self.ses_paths.append(dev_id)
+ self.path_count = len(self.ses_paths)
+
+ def _dump(self) -> Dict[str, Any]:
+ """Return a dict representation of the object"""
+ return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
+
+ def __str__(self) -> str:
+ """Return a formatted json representation of the object as a string"""
+ return json.dumps(self._dump(), indent=2)
+
+ def __repr__(self) -> str:
+ """Return a json representation of the object as a string"""
+ return json.dumps(self._dump())
+
+ def as_json(self) -> Dict[str, Any]:
+ """Return a dict representing the object"""
+ return self._dump()
+
+
class HostFacts():
_dmi_path_list = ['/sys/class/dmi/id']
_nic_path_list = ['/sys/class/net']
_disk_vendor_workarounds = {
'0x1af4': 'Virtio Block Device'
}
- _excluded_block_devices = ('sr', 'zram', 'dm-')
+ _excluded_block_devices = ('sr', 'zram', 'dm-', 'loop', 'md')
+ _sg_generic_glob = '/sys/class/scsi_generic/*'
def __init__(self, ctx: CephadmContext):
self.ctx: CephadmContext = ctx
self._process_nics()
self.arch: str = platform.processor()
self.kernel: str = platform.release()
+ self._enclosures = self._discover_enclosures()
+ self._block_devices = self._get_block_devs()
+ self._device_list = self._get_device_info()
+
+ def _discover_enclosures(self) -> Dict[str, Enclosure]:
+ """Build a dictionary of discovered scsi enclosures
+
+ Enclosures are detected by walking the scsi generic sysfs hierarchy.
+ Any device tree that holds an 'enclosure' subdirectory is interpreted as
+ an enclosure. Once identified the enclosire directory is analysis to
+ identify key descriptors that will help relate disks to enclosures and
+ disks to enclosure slots.
+
+ :return: Dict[str, Enclosure]: a map of enclosure id (hex) to enclosure object
+ """
+ sg_paths: List[str] = glob(HostFacts._sg_generic_glob)
+ enclosures: Dict[str, Enclosure] = {}
+
+ for sg_path in sg_paths:
+ enc_path = os.path.join(sg_path, 'device', 'enclosure')
+ if os.path.exists(enc_path):
+ enc_dirs = glob(os.path.join(enc_path, '*'))
+ if len(enc_dirs) != 1:
+ # incomplete enclosure spec - expecting ONE dir in the fomrat
+ # host(adapter):bus:target:lun e.g. 16:0:0:0
+ continue
+ enc_path = enc_dirs[0]
+ enc_id = read_file([os.path.join(enc_path, 'id')])
+ if enc_id in enclosures:
+ enclosures[enc_id].update(os.path.basename(sg_path))
+ continue
+
+ enclosure = Enclosure(enc_id, enc_path, sg_path)
+ enclosures[enc_id] = enclosure
+
+ return enclosures
+
+ @property
+ def enclosures(self) -> Dict[str, Dict[str, Any]]:
+ """Dump the enclosure objects as dicts"""
+ return {k: v._dump() for k, v in self._enclosures.items()}
+
+ @property
+ def enclosure_count(self) -> int:
+ """Return the number of enclosures detected"""
+ return len(self._enclosures.keys())
def _get_cpuinfo(self):
# type: () -> None
return [dev for dev in os.listdir('/sys/block')
if not dev.startswith(HostFacts._excluded_block_devices)]
- def _get_devs_by_type(self, rota='0'):
- # type: (str) -> List[str]
- """Filter block devices by a given rotational attribute (0=flash, 1=spinner)"""
- devs = list()
- for blk_dev in self._get_block_devs():
- rot_path = '/sys/block/{}/queue/rotational'.format(blk_dev)
- rot_value = read_file([rot_path])
- if rot_value == rota:
- devs.append(blk_dev)
- return devs
-
@property
def operating_system(self):
# type: () -> str
def hdd_count(self):
# type: () -> int
"""Return a count of HDDs (spinners)"""
- return len(self._get_devs_by_type(rota='1'))
+ return len(self.hdd_list)
def _get_capacity(self, dev):
# type: (str) -> int
blk_count = int(read_file([blk_path]))
return size_blocks * blk_count
- def _get_capacity_by_type(self, rota='0'):
+ def _get_capacity_by_type(self, disk_type='hdd'):
# type: (str) -> int
"""Return the total capacity of a category of device (flash or hdd)"""
- devs = self._get_devs_by_type(rota=rota)
- capacity = 0
- for dev in devs:
- capacity += self._get_capacity(dev)
+ capacity: int = 0
+ for dev in self._device_list:
+ if dev['disk_type'] == disk_type:
+ disk_capacity = cast(int, dev.get('disk_size_bytes', 0))
+ capacity += disk_capacity
return capacity
- def _dev_list(self, dev_list):
- # type: (List[str]) -> List[Dict[str, object]]
- """Return a 'pretty' name list for each device in the `dev_list`"""
+ def _get_device_info(self):
+ # type: () -> List[Dict[str, object]]
+ """Return a 'pretty' name list for each unique device in the `dev_list`"""
disk_list = list()
- for dev in dev_list:
+ # serial_num_lookup is a dict of serial number -> List of devices with that serial number
+ serial_num_lookup: Dict[str, List[str]] = {}
+
+ # make a map of devname -> disk path. this path name may indicate the physical slot
+ # of a drive (phyXX)
+ disk_path_map: Dict[str, str] = {}
+ for path in glob('/dev/disk/by-path/*'):
+ tgt_raw = Path(path).resolve()
+ tgt = os.path.basename(str(tgt_raw))
+ disk_path_map[tgt] = path
+
+ # make a map of holder (dm-XX) -> full mpath name
+ dm_device_map: Dict[str, str] = {}
+ for mpath in glob('/dev/mapper/mpath*'):
+ tgt_raw = Path(mpath).resolve()
+ tgt = os.path.basename(str(tgt_raw))
+ dm_device_map[tgt] = mpath
+
+ # main loop to process all eligible block devices
+ for dev in self._block_devices:
+ enclosure_id = ''
+ enclosure_slot = ''
+ scsi_addr = ''
+ mpath = ''
+
disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip()
disk_rev = read_file(['/sys/block/{}/device/rev'.format(dev)]).strip()
disk_wwid = read_file(['/sys/block/{}/device/wwid'.format(dev)]).strip()
vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip()
+ rotational = read_file(['/sys/block/{}/queue/rotational'.format(dev)])
+ holders_raw = glob('/sys/block/{}/holders/*'.format(dev))
+ if len(holders_raw) == 1:
+ # mpath will have 1 holder entry
+ holder = os.path.basename(holders_raw[0])
+ mpath = dm_device_map.get(holder, '')
+
+ disk_type = 'hdd' if rotational == '1' else 'flash'
+ scsi_addr_path = glob('/sys/block/{}/device/bsg/*'.format(dev))
+ if len(scsi_addr_path) == 1:
+ scsi_addr = os.path.basename(scsi_addr_path[0])
+
+ # vpd_pg80 isn't guaranteed (libvirt, vmware for example)
+ serial_raw = read_file(['/sys/block/{}/device/vpd_pg80'.format(dev)])
+ serial = (''.join(i for i in serial_raw if i in string.printable)).strip()
+ if serial.lower() == 'unknown':
+ serial = ''
+ else:
+ if serial in serial_num_lookup:
+ serial_num_lookup[serial].append(dev)
+ else:
+ serial_num_lookup[serial] = [dev]
+ for enc_id, enclosure in self._enclosures.items():
+ if serial in enclosure.device_lookup.keys():
+ enclosure_id = enc_id
+ enclosure_slot = enclosure.device_lookup[serial]
+
disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
disk_size_bytes = self._get_capacity(dev)
disk_list.append({
'wwid': disk_wwid,
'dev_name': dev,
'disk_size_bytes': disk_size_bytes,
+ 'disk_type': disk_type,
+ 'serial': serial,
+ 'alt_dev_name': '',
+ 'scsi_addr': scsi_addr,
+ 'enclosure_id': enclosure_id,
+ 'enclosure_slot': enclosure_slot,
+ 'path_id': disk_path_map.get(dev, ''),
+ 'mpath': mpath,
})
- return disk_list
+
+ # process the devices to drop duplicate physical devs based on matching
+ # the unique serial number
+ disk_list_unique: List[Dict[str, Any]] = []
+ serials_seen: List[str] = []
+ for dev in disk_list:
+ serial = str(dev['serial'])
+ if serial:
+ if serial in serials_seen:
+ continue
+ else:
+ serials_seen.append(serial)
+ devs = serial_num_lookup[serial].copy()
+ devs.remove(str(dev['dev_name']))
+ dev['alt_dev_name'] = ','.join(devs)
+ disk_list_unique.append(dev)
+
+ return disk_list_unique
@property
def hdd_list(self):
# type: () -> List[Dict[str, object]]
"""Return a list of devices that are HDDs (spinners)"""
- devs = self._get_devs_by_type(rota='1')
- return self._dev_list(devs)
+ return [dev for dev in self._device_list if dev['disk_type'] == 'hdd']
@property
def flash_list(self):
# type: () -> List[Dict[str, object]]
"""Return a list of devices that are flash based (SSD, NVMe)"""
- devs = self._get_devs_by_type(rota='0')
- return self._dev_list(devs)
+ return [dev for dev in self._device_list if dev['disk_type'] == 'flash']
@property
def hdd_capacity_bytes(self):
# type: () -> int
"""Return the total capacity for all HDD devices (bytes)"""
- return self._get_capacity_by_type(rota='1')
+ return self._get_capacity_by_type(disk_type='hdd')
@property
def hdd_capacity(self):
def flash_count(self):
# type: () -> int
"""Return the number of flash devices in the system (SSD, NVMe)"""
- return len(self._get_devs_by_type(rota='0'))
+ return len(self.flash_list)
@property
def flash_capacity_bytes(self):
# type: () -> int
"""Return the total capacity for all flash devices (bytes)"""
- return self._get_capacity_by_type(rota='0')
+ return self._get_capacity_by_type(disk_type='flash')
@property
def flash_capacity(self):