if r:
failures.append(r)
+ if self.cache.host_needs_osdspec_preview_refresh(host):
+ self.log.debug(f"refreshing OSDSpec previews for {host}")
+ r = self._refresh_host_osdspec_previews(host)
+ if r:
+ failures.append(r)
+
health_changed = False
if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
if health_changed:
self.set_health_checks(self.health_checks)
-
-
self._check_for_strays()
if self.paused:
self.log.info('Removed label %s to host %s' % (label, host))
return 'Removed label %s from host %s' % (label, host)
+ def _refresh_host_osdspec_previews(self, host) -> bool:
+ self.cache.update_osdspec_previews(host)
+ self.cache.save_host(host)
+ self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
+ return True
+
def _refresh_host_daemons(self, host):
try:
out, err, code = self._run_cephadm(
host, len(devices), len(networks)))
devices = inventory.Devices.from_json(devices)
self.cache.update_host_devices_networks(host, devices.devices, networks)
+ self.cache.update_osdspec_previews(host)
self.cache.save_host(host)
return None
@trivial_completion
def remove_service(self, service_name):
self.log.info('Remove service %s' % service_name)
+ self._trigger_preview_refresh(service_name=service_name)
found = self.spec_store.rm(service_name)
if found:
self._kick_serve_loop()
r[str(osd_id)] = o.get('uuid', '')
return r
+ def resolve_hosts_for_osdspecs(self,
+ specs: Optional[List[DriveGroupSpec]] = None,
+ service_name: Optional[str] = None
+ ) -> List[str]:
+ osdspecs = []
+ if service_name:
+ self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
+ osdspecs = self.spec_store.find(service_name=service_name)
+ osdspecs = [cast(DriveGroupSpec, spec) for spec in osdspecs]
+ self.log.debug(f"Found OSDSpecs: {osdspecs}")
+ if specs:
+ osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
+ if not service_name and not specs:
+ # if neither parameters are fulfilled, search for all available osdspecs
+ osdspecs = self.spec_store.find(service_name='osd')
+ self.log.debug(f"Found OSDSpecs: {osdspecs}")
+ if not osdspecs:
+ self.log.debug("No OSDSpecs found")
+ return []
+ return sum([spec.placement.pattern_matches_hosts(self.cache.get_hosts()) for spec in osdspecs], [])
+
+ def resolve_osdspecs_for_host(self, host):
+ matching_specs = []
+ self.log.debug(f"Finding OSDSpecs for host: <{host}>")
+ for spec in self.spec_store.find('osd'):
+ if host in spec.placement.pattern_matches_hosts(self.cache.get_hosts()):
+ self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
+ matching_specs.append(spec)
+ return matching_specs
+
+ def _trigger_preview_refresh(self,
+ specs: Optional[List[DriveGroupSpec]] = None,
+ service_name: Optional[str] = None):
+ refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
+ for host in refresh_hosts:
+ self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
+ self.cache.osdspec_previews_refresh_queue.append(host)
+
@trivial_completion
def apply_drivegroups(self, specs: List[DriveGroupSpec]):
+ self._trigger_preview_refresh(specs=specs)
return [self._apply(spec) for spec in specs]
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
return self.osd_service.create_from_spec(drive_group)
- # @trivial_completion
- def preview_drivegroups(self, drive_group_name: Optional[str] = None,
- dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
- return self.osd_service.preview_drivegroups(drive_group_name, dg_specs)
+ def preview_osdspecs(self,
+ osdspec_name: Optional[str] = None,
+ osdspecs: Optional[List[DriveGroupSpec]] = None
+ ) -> Dict[str, List[Dict[str, Any]]]:
+ matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
+ if not matching_hosts:
+ return {'n/a': [{'error': True,
+ 'message': 'No OSDSpec or matching hosts found.'}]}
+ # Is any host still loading previews
+ pending_flags = {f for (h, f) in self.cache.loading_osdspec_preview.items() if h in matching_hosts}
+ if any(pending_flags):
+ # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
+ return {'n/a': [{'error': True,
+ 'message': 'Preview data is being generated.. '
+ 'Please try again in a bit.'}]}
+ # drop all keys that are not in search_hosts and return preview struct
+ return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
def _calc_daemon_deps(self, daemon_type, daemon_id):
need = {
else:
return "Created no osd(s) on host %s; already created?" % host
- def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[
- Tuple[str, DriveSelection]]:
+ def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
# 1) use fn_filter to determine matching_hosts
matching_hosts = drive_group.placement.pattern_matches_hosts(
[x for x in self.mgr.cache.get_hosts()])
logger.debug(f"Resulting ceph-volume cmd: {cmd}")
return cmd
- def preview_drivegroups(self, drive_group_name: Optional[str] = None,
- dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[
- Dict[str, Dict[Any, Any]]]:
- # find drivegroups
- if drive_group_name:
- drive_groups = cast(List[DriveGroupSpec],
- self.mgr.spec_store.find(service_name=drive_group_name))
- elif dg_specs:
- drive_groups = dg_specs
- else:
- drive_groups = []
- ret_all = []
- for drive_group in drive_groups:
- drive_group.osd_id_claims = self.find_destroyed_osds()
- logger.info(
- f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+ def get_previews(self, host) -> List[Dict[str, Any]]:
+ # Find OSDSpecs that match host.
+ osdspecs = self.mgr.resolve_osdspecs_for_host(host)
+ return self.generate_previews(osdspecs)
+
+ def generate_previews(self, osdspecs: List[DriveGroupSpec]) -> List[Dict[str, Any]]:
+ """
+
+ The return should look like this:
+
+ [
+ {'data': {<metadata>},
+ 'osdspec': <name of osdspec>,
+ 'host': <name of host>
+ },
+
+ {'data': ...,
+ 'osdspec': ..,
+ 'host': ..
+ }
+ ]
+
+ Note: One host can have multiple previews based on its assigned OSDSpecs.
+ """
+ self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
+ ret_all: List[Dict[str, Any]] = []
+ if not osdspecs:
+ return ret_all
+ for osdspec in osdspecs:
+
+ # populate osd_id_claims
+ osdspec.osd_id_claims = self.find_destroyed_osds()
+
# prepare driveselection
- for host, ds in self.prepare_drivegroup(drive_group):
- cmd = self.driveselection_to_ceph_volume(drive_group, ds,
- drive_group.osd_id_claims.get(host,
- []),
+ for host, ds in self.prepare_drivegroup(osdspec):
+
+ # driveselection for host
+ cmd = self.driveselection_to_ceph_volume(osdspec,
+ ds,
+ osdspec.osd_id_claims.get(host, []),
preview=True)
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
- drive_group.service_name()))
+ osdspec.service_name()))
continue
+
+ # get preview data from ceph-volume
out, err, code = self._run_ceph_volume_command(host, cmd)
if out:
- concat_out = json.loads(" ".join(out))
- ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id,
+ concat_out: Dict[str, Any] = json.loads(" ".join(out))
+ ret_all.append({'data': concat_out,
+ 'osdspec': osdspec.service_id,
'host': host})
return ret_all
- def _run_ceph_volume_command(self, host: str, cmd: str, env_vars: Optional[List[str]] = None) -> Tuple[List[str], List[str], int]:
+ def _run_ceph_volume_command(self, host: str,
+ cmd: str, env_vars: Optional[List[str]] = None
+ ) -> Tuple[List[str], List[str], int]:
self.mgr.inventory.assert_host(host)
# get bootstrap key
osd_host_map.update(
{node.get('name'): [str(_id) for _id in node.get('children', list())]}
)
+ self.mgr.log.info(
+ f"Found osd claims -> {osd_host_map}")
return osd_host_map
out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
assert out in exp_command
- @mock.patch("cephadm.module.SpecStore.find")
- @mock.patch("cephadm.services.osd.OSDService.prepare_drivegroup")
- @mock.patch("cephadm.services.osd.OSDService.driveselection_to_ceph_volume")
- @mock.patch("cephadm.services.osd.OSDService._run_ceph_volume_command")
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
- _find_store.return_value = [dg]
- _prepare_dg.return_value = [('host1', 'ds_dummy')]
- _run_c_v_command.return_value = ("{}", '', 0)
- cephadm_module.osd_service.preview_drivegroups(drive_group_name='foo')
- _find_store.assert_called_once_with(service_name='foo')
- _prepare_dg.assert_called_once_with(dg)
- _run_c_v_command.assert_called_once()
-
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
dict(