From: Adam King Date: Tue, 31 May 2022 20:22:49 +0000 (-0400) Subject: mgr/cephadm: support for os tuning profiles X-Git-Tag: v17.2.6~21^2~83^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=26d5f9230b57635ea46aa9a3367bbae28ac1c389;p=ceph.git mgr/cephadm: support for os tuning profiles Fixes: https://tracker.ceph.com/issues/55819 Signed-off-by: Adam King (cherry picked from commit 91e6e80ce38031d5eba148efd7c4aaede09021ac) Conflicts: src/python-common/ceph/deployment/service_spec.py --- diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index b3a76eaca70d1..7a88258f002dc 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -11,7 +11,7 @@ from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Se import orchestrator from ceph.deployment import inventory -from ceph.deployment.service_spec import ServiceSpec, PlacementSpec +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types from cephadm.services.cephadmservice import CephadmDaemonDeploySpec @@ -405,6 +405,77 @@ class ClientKeyringStore(): self.save() +class TunedProfileStore(): + """ + Store for out tuned profile information + """ + + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: CephadmOrchestrator = mgr + self.mgr = mgr + self.profiles: Dict[str, TunedProfileSpec] = {} + + def __contains__(self, profile: str) -> bool: + return profile in self.profiles + + def load(self) -> None: + c = self.mgr.get_store('tuned_profiles') or b'{}' + j = json.loads(c) + for k, v in j.items(): + self.profiles[k] = TunedProfileSpec.from_json(v) + self.profiles[k]._last_updated = datetime_to_str(datetime_now()) + + def save(self) -> None: + profiles_json = {k: v.to_json() for k, v in self.profiles.items()} + self.mgr.set_store('tuned_profiles', json.dumps(profiles_json)) + + def add_setting(self, profile: str, setting: str, value: str) -> None: + if profile in self.profiles: + self.profiles[profile].settings[setting] = value + self.profiles[profile]._last_updated = datetime_to_str(datetime_now()) + self.save() + else: + logger.error( + f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"') + + def rm_setting(self, profile: str, setting: str) -> None: + if profile in self.profiles: + if setting in self.profiles[profile].settings: + self.profiles[profile].settings.pop(setting, '') + self.profiles[profile]._last_updated = datetime_to_str(datetime_now()) + self.save() + else: + logger.error( + f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"') + else: + logger.error( + f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"') + + def add_profile(self, spec: TunedProfileSpec) -> None: + spec._last_updated = datetime_to_str(datetime_now()) + self.profiles[spec.profile_name] = spec + self.save() + + def rm_profile(self, profile: str) -> None: + if profile in self.profiles: + self.profiles.pop(profile, TunedProfileSpec('')) + else: + logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"') + self.save() + + def last_updated(self, profile: str) -> Optional[datetime.datetime]: + if profile not in self.profiles or not self.profiles[profile]._last_updated: + return None + return str_to_datetime(self.profiles[profile]._last_updated) + + def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None: + if profile in self.profiles: + self.profiles[profile]._last_updated = datetime_to_str(new_datetime) + + def list_profiles(self) -> List[TunedProfileSpec]: + return [p for p in self.profiles.values()] + + class HostCache(): """ HostCache stores different things: @@ -451,6 +522,7 @@ class HostCache(): self.last_network_update = {} # type: Dict[str, datetime.datetime] self.last_device_update = {} # type: Dict[str, datetime.datetime] self.last_device_change = {} # type: Dict[str, datetime.datetime] + self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime] self.daemon_refresh_queue = [] # type: List[str] self.device_refresh_queue = [] # type: List[str] self.network_refresh_queue = [] # type: List[str] @@ -515,6 +587,9 @@ class HostCache(): } if 'last_host_check' in j: self.last_host_check[host] = str_to_datetime(j['last_host_check']) + if 'last_tuned_profile_update' in j: + self.last_tuned_profile_update[host] = str_to_datetime( + j['last_tuned_profile_update']) self.registry_login_queue.add(host) self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False) @@ -697,6 +772,8 @@ class HostCache(): j['last_network_update'] = datetime_to_str(self.last_network_update[host]) if host in self.last_device_change: j['last_device_change'] = datetime_to_str(self.last_device_change[host]) + if host in self.last_tuned_profile_update: + j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host]) if host in self.daemons: for name, dd in self.daemons[host].items(): j['daemons'][name] = dd.to_json() @@ -816,6 +893,8 @@ class HostCache(): del self.last_network_update[host] if host in self.last_device_change: del self.last_device_change[host] + if host in self.last_tuned_profile_update: + del self.last_tuned_profile_update[host] if host in self.daemon_config_deps: del self.daemon_config_deps[host] if host in self.scheduled_daemon_actions: @@ -999,6 +1078,24 @@ class HostCache(): return True return False + def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool: + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile') + return False + if profile not in self.mgr.tuned_profiles: + logger.debug( + f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist') + return False + if host not in self.last_tuned_profile_update: + return True + last_profile_update = self.mgr.tuned_profiles.last_updated(profile) + if last_profile_update is None: + self.mgr.tuned_profiles.set_last_updated(profile, datetime_now()) + return True + if self.last_tuned_profile_update[host] < last_profile_update: + return True + return False + def host_had_daemon_refresh(self, host: str) -> bool: """ ... at least once. diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 42200a7d04b09..d9f98b6e00fbb 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -25,7 +25,8 @@ from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.service_spec import \ ServiceSpec, PlacementSpec, \ - HostPlacementSpec, IngressSpec + HostPlacementSpec, IngressSpec, \ + TunedProfileSpec from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonDeploySpec @@ -56,13 +57,14 @@ from .services.monitoring import GrafanaService, AlertmanagerService, Prometheus NodeExporterService, SNMPGatewayService, LokiService, PromtailService from .schedule import HostAssignment from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \ - ClientKeyringStore, ClientKeyringSpec + ClientKeyringStore, ClientKeyringSpec, TunedProfileStore from .upgrade import CephadmUpgrade from .template import TemplateMgr from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \ cephadmNoImage, CEPH_UPGRADE_ORDER from .configchecks import CephadmConfigChecks from .offline_watcher import OfflineHostWatcher +from .tuned_profiles import TunedProfileUtils try: import asyncssh @@ -505,6 +507,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.keys = ClientKeyringStore(self) self.keys.load() + self.tuned_profiles = TunedProfileStore(self) + self.tuned_profiles.load() + + self.tuned_profile_utils = TunedProfileUtils(self) + # ensure the host lists are in sync for h in self.inventory.keys(): if h not in self.cache.daemons: @@ -2438,6 +2445,46 @@ Then run the following: return self._apply_service_spec(cast(ServiceSpec, spec)) + @handle_orch_error + def apply_tuned_profiles(self, specs: List[TunedProfileSpec]) -> str: + outs = [] + for spec in specs: + self.tuned_profiles.add_profile(spec) + outs.append(f'Saved tuned profile {spec.profile_name}') + self._kick_serve_loop() + return '\n'.join(outs) + + @handle_orch_error + def rm_tuned_profile(self, profile_name: str) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f'Tuned profile {profile_name} does not exist. Nothing to remove.') + self.tuned_profiles.rm_profile(profile_name) + self._kick_serve_loop() + return f'Removed tuned profile {profile_name}' + + @handle_orch_error + def tuned_profile_ls(self) -> List[TunedProfileSpec]: + return self.tuned_profiles.list_profiles() + + @handle_orch_error + def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f'Tuned profile {profile_name} does not exist. Cannot add setting.') + self.tuned_profiles.add_setting(profile_name, setting, value) + self._kick_serve_loop() + return f'Added setting {setting} with value {value} to tuned profile {profile_name}' + + @handle_orch_error + def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f'Tuned profile {profile_name} does not exist. Cannot remove setting.') + self.tuned_profiles.rm_setting(profile_name, setting) + self._kick_serve_loop() + return f'Removed setting {setting} from tuned profile {profile_name}' + def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None: self.health_checks[name] = { 'severity': 'warning', diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index a9c6390c91531..a1ee9cfccbce1 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -519,6 +519,7 @@ class CephadmServe: len(self.mgr.apply_spec_fails), warnings) self.mgr.update_watched_hosts() + self.mgr.tuned_profile_utils._write_all_tuned_profiles() return r def _apply_service_config(self, spec: ServiceSpec) -> None: diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 7a4ac0d873cd7..0567f7f7e68a0 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -99,7 +99,8 @@ def with_cephadm_module(module_options=None, store=None): mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \ mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \ mock.patch('cephadm.agent.CherryPyThread.run'), \ - mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'): + mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \ + mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'): m = CephadmOrchestrator.__new__(CephadmOrchestrator) if module_options is not None: diff --git a/src/pybind/mgr/cephadm/tuned_profiles.py b/src/pybind/mgr/cephadm/tuned_profiles.py new file mode 100644 index 0000000000000..f07f8f3e48fbc --- /dev/null +++ b/src/pybind/mgr/cephadm/tuned_profiles.py @@ -0,0 +1,75 @@ +import logging +from typing import Dict, List, TYPE_CHECKING +from ceph.utils import datetime_now +from .schedule import HostAssignment +from ceph.deployment.service_spec import ServiceSpec, TunedProfileSpec + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + +logger = logging.getLogger(__name__) + +SYSCTL_DIR = '/etc/sysctl.d' + + +class TunedProfileUtils(): + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr = mgr + + def _profile_to_str(self, p: TunedProfileSpec) -> str: + p_str = f'# created by cephadm\n# tuned profile "{p.profile_name}"\n\n' + for k, v in p.settings.items(): + p_str += f'{k} = {v}\n' + return p_str + + def _write_all_tuned_profiles(self) -> None: + host_profile_mapping: Dict[str, List[Dict[str, str]]] = {} + for host in self.mgr.cache.get_hosts(): + host_profile_mapping[host] = [] + + for profile in self.mgr.tuned_profiles.list_profiles(): + p_str = self._profile_to_str(profile) + ha = HostAssignment( + spec=ServiceSpec( + 'crash', placement=profile.placement), + hosts=self.mgr.cache.get_schedulable_hosts(), + unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), + daemons=[], + networks=self.mgr.cache.networks, + ) + all_slots, _, _ = ha.place() + for host in {s.hostname for s in all_slots}: + host_profile_mapping[host].append({profile.profile_name: p_str}) + + for host, profiles in host_profile_mapping.items(): + self._remove_stray_tuned_profiles(host, profiles) + self._write_tuned_profiles(host, profiles) + + def _remove_stray_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None: + cmd = ['ls', SYSCTL_DIR] + found_files = self.mgr.ssh.check_execute_command(host, cmd).split('\n') + found_files = [s.strip() for s in found_files] + updated = False + for file in found_files: + if '-cephadm-tuned-profile.conf' not in file: + continue + if not any(file.split('-')[0] in p.keys() for p in profiles): + logger.info(f'Removing stray tuned profile file {file}') + cmd = ['rm', '-f', f'{SYSCTL_DIR}/{file}'] + self.mgr.ssh.check_execute_command(host, cmd) + updated = True + if updated: + self.mgr.ssh.check_execute_command(host, ['sysctl', '--system']) + + def _write_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None: + updated = False + for p in profiles: + for profile_name, content in p.items(): + if self.mgr.cache.host_needs_tuned_profile_update(host, profile_name): + logger.info(f'Writing tuned profile {profile_name} to host {host}') + profile_filename: str = f'{SYSCTL_DIR}/{profile_name}-cephadm-tuned-profile.conf' + self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8')) + updated = True + if updated: + self.mgr.ssh.check_execute_command(host, ['sysctl', '--system']) + self.mgr.cache.last_tuned_profile_update[host] = datetime_now() diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index c9414d797bb25..c1f95ac302283 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -31,7 +31,7 @@ import yaml from ceph.deployment import inventory from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ - IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec + IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec, TunedProfileSpec from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.hostspec import HostSpec, SpecValidationError from ceph.utils import datetime_to_str, str_to_datetime @@ -668,6 +668,26 @@ class Orchestrator(object): """Update an existing snmp gateway service""" raise NotImplementedError() + def apply_tuned_profiles(self, specs: List[TunedProfileSpec]) -> OrchResult[str]: + """Add or update an existing tuned profile""" + raise NotImplementedError() + + def rm_tuned_profile(self, profile_name: str) -> OrchResult[str]: + """Remove a tuned profile""" + raise NotImplementedError() + + def tuned_profile_ls(self) -> OrchResult[List[TunedProfileSpec]]: + """See current tuned profiles""" + raise NotImplementedError() + + def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> OrchResult[str]: + """Change/Add a specific setting for a tuned profile""" + raise NotImplementedError() + + def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> OrchResult[str]: + """Remove a specific setting for a tuned profile""" + raise NotImplementedError() + def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]: raise NotImplementedError() diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 0dd5c23f71248..13808a4da7f74 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -29,7 +29,7 @@ from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_comma NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \ RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \ ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, \ - GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec + GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec, TunedProfileSpec def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str: @@ -1352,6 +1352,84 @@ Usage: output += f"\nHost Parallelism: {result['workers']}" return HandleCommandResult(stdout=output) + @_cli_write_command('orch tuned-profile apply') + def _apply_tuned_profiles(self, + profile_name: Optional[str] = None, + placement: Optional[str] = None, + settings: Optional[str] = None, + no_overwrite: bool = False, + inbuf: Optional[str] = None) -> HandleCommandResult: + """Add or update a tuned profile""" + usage = """Usage: + ceph orch tuned-profile apply -i + ceph orch tuned-profile apply [--placement=] [--settings='option=value,option2=value2'] + """ + if inbuf: + if profile_name or placement or settings: + raise OrchestratorValidationError(usage) + yaml_objs: Iterator = yaml.safe_load_all(inbuf) + specs: List[TunedProfileSpec] = [] + # YAML '---' document separator with no content generates + # None entries in the output. Let's skip them silently. + content = [o for o in yaml_objs if o is not None] + for spec in content: + specs.append(TunedProfileSpec.from_json(spec)) + else: + if not profile_name: + raise OrchestratorValidationError(usage) + placement_spec = PlacementSpec.from_string( + placement) if placement else PlacementSpec(host_pattern='*') + settings_dict = {} + if settings: + settings_list = settings.split(',') + for setting in settings_list: + if '=' not in setting: + raise SpecValidationError('settings defined on cli for tuned profile must ' + + 'be of format "setting_name=value,setting_name2=value2" etc.') + name, value = setting.split('=', 1) + settings_dict[name.strip()] = value.strip() + tuned_profile_spec = TunedProfileSpec( + profile_name=profile_name, placement=placement_spec, settings=settings_dict) + specs = [tuned_profile_spec] + completion = self.apply_tuned_profiles(specs) + res = raise_if_exception(completion) + return HandleCommandResult(stdout=res) + + @_cli_write_command('orch tuned-profile rm') + def _rm_tuned_profiles(self, profile_name: str) -> HandleCommandResult: + completion = self.rm_tuned_profile(profile_name) + res = raise_if_exception(completion) + return HandleCommandResult(stdout=res) + + @_cli_read_command('orch tuned-profile ls') + def _tuned_profile_ls(self, format: Format = Format.plain) -> HandleCommandResult: + completion = self.tuned_profile_ls() + profiles: List[TunedProfileSpec] = raise_if_exception(completion) + if format != Format.plain: + return HandleCommandResult(stdout=to_format(profiles, format, many=True, cls=TunedProfileSpec)) + else: + out = '' + for profile in profiles: + out += f'profile_name: {profile.profile_name}\n' + out += f'placement: {profile.placement.pretty_str()}\n' + out += 'settings:\n' + for k, v in profile.settings.items(): + out += f' {k}: {v}\n' + out += '---\n' + return HandleCommandResult(stdout=out) + + @_cli_write_command('orch tuned-profile add-setting') + def _tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> HandleCommandResult: + completion = self.tuned_profile_add_setting(profile_name, setting, value) + res = raise_if_exception(completion) + return HandleCommandResult(stdout=res) + + @_cli_write_command('orch tuned-profile rm-setting') + def _tuned_profile_rm_setting(self, profile_name: str, setting: str) -> HandleCommandResult: + completion = self.tuned_profile_rm_setting(profile_name, setting) + res = raise_if_exception(completion) + return HandleCommandResult(stdout=res) + def self_test(self) -> None: old_orch = self._select_orchestrator() self._set_backend('') diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index c23783c5da0e8..a663413230103 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -1355,3 +1355,55 @@ class MDSSpec(ServiceSpec): yaml.add_representer(MDSSpec, ServiceSpec.yaml_representer) + + +class TunedProfileSpec(): + def __init__(self, + profile_name: str, + placement: Optional[PlacementSpec] = None, + settings: Optional[Dict[str, str]] = None, + ): + self.profile_name = profile_name + self.placement = placement or PlacementSpec(host_pattern='*') + self.settings = settings or {} + self._last_updated: str = '' + + @classmethod + def from_json(cls, spec: Dict[str, Any]) -> 'TunedProfileSpec': + data = {} + if 'profile_name' not in spec: + raise SpecValidationError('Tuned profile spec must include "profile_name" field') + data['profile_name'] = spec['profile_name'] + if not isinstance(data['profile_name'], str): + raise SpecValidationError('"profile_name" field must be a string') + if 'placement' in spec: + data['placement'] = PlacementSpec.from_json(spec['placement']) + if 'settings' in spec: + data['settings'] = spec['settings'] + return cls(**data) + + def to_json(self) -> Dict[str, Any]: + res: Dict[str, Any] = {} + res['profile_name'] = self.profile_name + res['placement'] = self.placement.to_json() + res['settings'] = self.settings + return res + + def __eq__(self, other: Any) -> bool: + if isinstance(other, TunedProfileSpec): + if ( + self.placement == other.placement + and self.profile_name == other.profile_name + and self.settings == other.settings + ): + return True + return False + return NotImplemented + + def __repr__(self) -> str: + return f'TunedProfile({self.profile_name})' + + def copy(self) -> 'TunedProfileSpec': + # for making deep copies so you can edit the settings in one without affecting the other + # mostly for testing purposes + return TunedProfileSpec(self.profile_name, self.placement, self.settings.copy())