import json
import logging
from textwrap import dedent
-from concurrent.futures import ThreadPoolExecutor
-
from ceph_volume import decorators, process
from ceph_volume.util import disk
from ceph_volume.api import lvm
self.argv = argv
self.info_devices: _List[Dict[str, str]] = []
self.devices_to_scan: _List[str] = []
+ self._lvs_by_realpath: Dict[str, _List[Any]] = {}
def exclude_invalid_devices(self, devices: _List[Dict[str, str]]) -> _List[Dict[str, str]]:
return [
def exclude_lvm_osd_devices(self) -> None:
lvm_mappers = disk.get_lvm_mappers()
- with ThreadPoolExecutor() as pool:
- filtered_devices_to_scan = pool.map(
- lambda device: self.filter_lvm_osd_devices(device, lvm_mappers),
- self.devices_to_scan
- )
- self.devices_to_scan = [device for device in filtered_devices_to_scan if device is not None]
+ self._lvs_by_realpath = self._build_lvs_by_realpath()
+ self.devices_to_scan = [
+ device
+ for device in self.devices_to_scan
+ if self.filter_lvm_osd_devices(device, lvm_mappers) is not None
+ ]
+
+ def _build_lvs_by_realpath(self) -> Dict[str, _List[Any]]:
+ result: Dict[str, _List[Any]] = {}
+ try:
+ for lv in lvm.get_lvs():
+ try:
+ rp = os.path.realpath(lv.lv_path)
+ result.setdefault(rp, []).append(lv)
+ except OSError as e:
+ logger.debug("Skipping LV %s: %s", lv.lv_path, e)
+ except RuntimeError as e:
+ logger.warning("Failed to list LVs while building LV-to-realpath map for raw device list: %s", e)
+ return result
def filter_lvm_osd_devices(self, device: str, lvm_mappers: _List[str]) -> Optional[str]:
real_path = os.path.realpath(device)
if real_path not in lvm_mappers:
return device
- # It's an LV, so we need to check if it's a Ceph device
- # This requires a subprocess call, but it's much lighter than Device()
- lvs = lvm.get_lvs_from_path(device)
+ lvs = self._lvs_by_realpath.get(real_path, [])
if lvs:
# Check if any LV has ceph.osd_id tag (making it a Ceph device)
for lv in lvs: