def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
logger.debug(f"Processing DriveGroup {drive_group}")
- osd_id_claims = self.find_destroyed_osds()
- if osd_id_claims:
+ osd_id_claims = OsdIdClaims(self.mgr)
+ if osd_id_claims.get():
logger.info(
- f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
+ f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
@forall_hosts
def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
if self.mgr.inventory.has_label(host, '_no_schedule'):
return None
+ osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
+
cmd = self.driveselection_to_ceph_volume(drive_selection,
- osd_id_claims.get(host, []))
+ osd_id_claims_for_host)
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
drive_group.service_id))
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = self.create_single_host(
drive_group, host, cmd,
- replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
+ replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
)
self.mgr.cache.update_osdspec_last_applied(
host, drive_group.service_name(), start_ts
replace_osd_ids: Optional[List[str]] = None) -> str:
if replace_osd_ids is None:
- replace_osd_ids = self.find_destroyed_osds().get(host, [])
+ replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
assert replace_osd_ids is not None
# check result
osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
for osdspec in osdspecs:
# populate osd_id_claims
- osd_id_claims = self.find_destroyed_osds()
+ osd_id_claims = OsdIdClaims(self.mgr)
# prepare driveselection
for host, ds in self.prepare_drivegroup(osdspec):
# driveselection for host
cmd = self.driveselection_to_ceph_volume(ds,
- osd_id_claims.get(host, []),
+ osd_id_claims.filtered_by_host(host),
preview=True)
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
def get_osdspec_affinity(self, osd_id: str) -> str:
return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
- def find_destroyed_osds(self) -> Dict[str, List[str]]:
- osd_host_map: Dict[str, List[str]] = dict()
+
+class OsdIdClaims(object):
+ """
+ Retrieve and provide osd ids that can be reused in the cluster
+ """
+
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: "CephadmOrchestrator" = mgr
+ self.osd_host_map: Dict[str, List[str]] = dict()
+ self.refresh()
+
+ def refresh(self) -> None:
try:
ret, out, err = self.mgr.check_mon_command({
'prefix': 'osd tree',
tree = json.loads(out)
except ValueError:
logger.exception(f'Cannot decode JSON: \'{out}\'')
- return osd_host_map
+ return
nodes = tree.get('nodes', {})
for node in nodes:
if node.get('type') == 'host':
- osd_host_map.update(
+ self.osd_host_map.update(
{node.get('name'): [str(_id) for _id in node.get('children', list())]}
)
- if osd_host_map:
- self.mgr.log.info(f"Found osd claims -> {osd_host_map}")
- return osd_host_map
+ if self.osd_host_map:
+ self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")
+
+ def get(self) -> Dict[str, List[str]]:
+ return self.osd_host_map
+
+ def filtered_by_host(self, host: str) -> List[str]:
+ """
+ Return the list of osd ids that can be reused in a host
+
+ OSD id claims in CRUSH map are linked to the bare name of
+ the hostname. In case of FQDN hostnames the host is searched by the
+ bare name
+ """
+ return self.osd_host_map.get(host.split(".")[0], [])
class RemoveUtil(object):
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
-from cephadm.services.osd import OSD, OSDRemovalQueue
+from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
try:
from typing import List
}
json_out = json.dumps(dict_out)
_mon_cmd.return_value = (0, json_out, '')
- out = cephadm_module.osd_service.find_destroyed_osds()
- assert out == {'host1': ['0']}
+ osd_claims = OsdIdClaims(cephadm_module)
+ assert osd_claims.get() == {'host1': ['0']}
+ assert osd_claims.filtered_by_host('host1') == ['0']
+ assert osd_claims.filtered_by_host('host1.domain.com') == ['0']
@ pytest.mark.parametrize(
"ceph_services, cephadm_daemons, strays_expected, metadata",
def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
_mon_cmd.return_value = (1, "", "fail_msg")
with pytest.raises(OrchestratorError):
- cephadm_module.osd_service.find_destroyed_osds()
+ OsdIdClaims(cephadm_module)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):