)
continue
+ if not disk.available and disk.ceph_device and disk.lvs:
+ other_osdspec_affinity = ''
+ for lv in disk.lvs:
+ if lv['osdspec_affinity'] != self.spec.service_id:
+ other_osdspec_affinity = lv['osdspec_affinity']
+ break
+ if other_osdspec_affinity:
+ logger.debug("{} is already used in spec {}, "
+ "skipping it.".format(disk.path, other_osdspec_affinity))
+ continue
+
if not self._has_mandatory_idents(disk):
logger.debug(
"Ignoring disk {}. Missing mandatory idents".format(
sys_api=None, # type: Optional[Dict[str, Any]]
available=None, # type: Optional[bool]
rejected_reasons=None, # type: Optional[List[str]]
- lvs=None, # type: Optional[List[str]]
+ lvs=None, # type: Optional[List[Dict[str, str]]]
device_id=None, # type: Optional[str]
lsm_data=None, # type: Optional[Dict[str, Dict[str, str]]]
created=None, # type: Optional[datetime.datetime]
return 'hdd' if self.sys_api["rotational"] == "1" else 'ssd'
def __repr__(self) -> str:
- device_desc: Dict[str, Union[str, List[str]]] = {
+ device_desc: Dict[str, Union[str, List[str], List[Dict[str, str]]]] = {
'path': self.path if self.path is not None else 'unknown',
'lvs': self.lvs if self.lvs else 'None',
'available': str(self.available),