# should only refresh if a change has been detected
self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
+ if spec.service_type == 'prometheus':
+ spec = cast(PrometheusSpec, spec)
+ if spec.retention_time:
+ valid_units = ['y', 'w', 'd', 'h', 'm', 's']
+ m = re.search(rf"^(\d+)({'|'.join(valid_units)})$", spec.retention_time)
+ if not m:
+ raise OrchestratorError(f"Invalid retention time. Valid units are: {', '.join(valid_units)}")
+ if spec.retention_size:
+ valid_units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB']
+ m = re.search(rf"^(\d+)({'|'.join(valid_units)})$", spec.retention_size)
+ if not m:
+ raise OrchestratorError(f"Invalid retention size. Valid units are: {', '.join(valid_units)}")
+
return self._apply_service_spec(cast(ServiceSpec, spec))
+ def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]:
+ """Return a list of candidate hosts according to the placement specification."""
+ all_hosts = self.cache.get_schedulable_hosts()
+ draining_hosts = [dh.hostname for dh in self.cache.get_draining_hosts()]
+ candidates = []
+ if placement.hosts:
+ candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts]
+ elif placement.label:
+ candidates = [x.hostname for x in [h for h in all_hosts if placement.label in h.labels]]
+ elif placement.host_pattern:
+ candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)]
+ elif (placement.count is not None or placement.count_per_host is not None):
+ candidates = [x.hostname for x in all_hosts]
+ return [h for h in candidates if h not in draining_hosts]
+
+ def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None:
+ """Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
+ if spec.count is not None:
+ raise OrchestratorError(
+ "Placement 'count' field is no supported for this specification.")
+ if spec.count_per_host is not None:
+ raise OrchestratorError(
+ "Placement 'count_per_host' field is no supported for this specification.")
+ if spec.hosts:
+ all_hosts = [h.hostname for h in self.inventory.all_specs()]
+ invalid_hosts = [h.hostname for h in spec.hosts if h.hostname not in all_hosts]
+ if invalid_hosts:
+ raise OrchestratorError(f"Found invalid host(s) in placement section: {invalid_hosts}. "
+ f"Please check 'ceph orch host ls' for available hosts.")
+ elif not self._get_candidate_hosts(spec):
+ raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n"
+ "Please check 'ceph orch host ls' for available hosts.\n"
+ "Note: draining hosts are excluded from the candidate list.")
+
+ def _validate_tunedprofile_settings(self, spec: TunedProfileSpec) -> Dict[str, List[str]]:
+ candidate_hosts = spec.placement.filter_matching_hostspecs(self.inventory.all_specs())
+ invalid_options: Dict[str, List[str]] = {}
+ for host in candidate_hosts:
+ host_sysctl_options = self.cache.get_facts(host).get('sysctl_options', {})
+ invalid_options[host] = []
+ for option in spec.settings:
+ if option not in host_sysctl_options:
+ invalid_options[host].append(option)
+ return invalid_options
+
+ def _validate_tuned_profile_spec(self, spec: TunedProfileSpec) -> None:
+ if not spec.settings:
+ raise OrchestratorError("Invalid spec: settings section cannot be empty.")
+ self._validate_one_shot_placement_spec(spec.placement)
+ invalid_options = self._validate_tunedprofile_settings(spec)
+ if any(e for e in invalid_options.values()):
+ raise OrchestratorError(
+ f'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}')
+
@handle_orch_error
def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str:
outs = []