import orchestrator
from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
self.save()
+class TunedProfileStore():
+ """
+ Store for out tuned profile information
+ """
+
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: CephadmOrchestrator = mgr
+ self.mgr = mgr
+ self.profiles: Dict[str, TunedProfileSpec] = {}
+
+ def __contains__(self, profile: str) -> bool:
+ return profile in self.profiles
+
+ def load(self) -> None:
+ c = self.mgr.get_store('tuned_profiles') or b'{}'
+ j = json.loads(c)
+ for k, v in j.items():
+ self.profiles[k] = TunedProfileSpec.from_json(v)
+ self.profiles[k]._last_updated = datetime_to_str(datetime_now())
+
+ def save(self) -> None:
+ profiles_json = {k: v.to_json() for k, v in self.profiles.items()}
+ self.mgr.set_store('tuned_profiles', json.dumps(profiles_json))
+
+ def add_setting(self, profile: str, setting: str, value: str) -> None:
+ if profile in self.profiles:
+ self.profiles[profile].settings[setting] = value
+ self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
+ self.save()
+ else:
+ logger.error(
+ f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')
+
+ def rm_setting(self, profile: str, setting: str) -> None:
+ if profile in self.profiles:
+ if setting in self.profiles[profile].settings:
+ self.profiles[profile].settings.pop(setting, '')
+ self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
+ self.save()
+ else:
+ logger.error(
+ f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
+ else:
+ logger.error(
+ f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')
+
+ def add_profile(self, spec: TunedProfileSpec) -> None:
+ spec._last_updated = datetime_to_str(datetime_now())
+ self.profiles[spec.profile_name] = spec
+ self.save()
+
+ def rm_profile(self, profile: str) -> None:
+ if profile in self.profiles:
+ self.profiles.pop(profile, TunedProfileSpec(''))
+ else:
+ logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"')
+ self.save()
+
+ def last_updated(self, profile: str) -> Optional[datetime.datetime]:
+ if profile not in self.profiles or not self.profiles[profile]._last_updated:
+ return None
+ return str_to_datetime(self.profiles[profile]._last_updated)
+
+ def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None:
+ if profile in self.profiles:
+ self.profiles[profile]._last_updated = datetime_to_str(new_datetime)
+
+ def list_profiles(self) -> List[TunedProfileSpec]:
+ return [p for p in self.profiles.values()]
+
+
class HostCache():
"""
HostCache stores different things:
self.last_network_update = {} # type: Dict[str, datetime.datetime]
self.last_device_update = {} # type: Dict[str, datetime.datetime]
self.last_device_change = {} # type: Dict[str, datetime.datetime]
+ self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime]
self.daemon_refresh_queue = [] # type: List[str]
self.device_refresh_queue = [] # type: List[str]
self.network_refresh_queue = [] # type: List[str]
}
if 'last_host_check' in j:
self.last_host_check[host] = str_to_datetime(j['last_host_check'])
+ if 'last_tuned_profile_update' in j:
+ self.last_tuned_profile_update[host] = str_to_datetime(
+ j['last_tuned_profile_update'])
self.registry_login_queue.add(host)
self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
j['last_network_update'] = datetime_to_str(self.last_network_update[host])
if host in self.last_device_change:
j['last_device_change'] = datetime_to_str(self.last_device_change[host])
+ if host in self.last_tuned_profile_update:
+ j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host])
if host in self.daemons:
for name, dd in self.daemons[host].items():
j['daemons'][name] = dd.to_json()
del self.last_network_update[host]
if host in self.last_device_change:
del self.last_device_change[host]
+ if host in self.last_tuned_profile_update:
+ del self.last_tuned_profile_update[host]
if host in self.daemon_config_deps:
del self.daemon_config_deps[host]
if host in self.scheduled_daemon_actions:
return True
return False
+ def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool:
+ if host in self.mgr.offline_hosts:
+ logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile')
+ return False
+ if profile not in self.mgr.tuned_profiles:
+ logger.debug(
+ f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
+ return False
+ if host not in self.last_tuned_profile_update:
+ return True
+ last_profile_update = self.mgr.tuned_profiles.last_updated(profile)
+ if last_profile_update is None:
+ self.mgr.tuned_profiles.set_last_updated(profile, datetime_now())
+ return True
+ if self.last_tuned_profile_update[host] < last_profile_update:
+ return True
+ return False
+
def host_had_daemon_refresh(self, host: str) -> bool:
"""
... at least once.
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
ServiceSpec, PlacementSpec, \
- HostPlacementSpec, IngressSpec
+ HostPlacementSpec, IngressSpec, \
+ TunedProfileSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
NodeExporterService, SNMPGatewayService, LokiService, PromtailService
from .schedule import HostAssignment
from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
- ClientKeyringStore, ClientKeyringSpec
+ ClientKeyringStore, ClientKeyringSpec, TunedProfileStore
from .upgrade import CephadmUpgrade
from .template import TemplateMgr
from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
cephadmNoImage, CEPH_UPGRADE_ORDER
from .configchecks import CephadmConfigChecks
from .offline_watcher import OfflineHostWatcher
+from .tuned_profiles import TunedProfileUtils
try:
import asyncssh
self.keys = ClientKeyringStore(self)
self.keys.load()
+ self.tuned_profiles = TunedProfileStore(self)
+ self.tuned_profiles.load()
+
+ self.tuned_profile_utils = TunedProfileUtils(self)
+
# ensure the host lists are in sync
for h in self.inventory.keys():
if h not in self.cache.daemons:
return self._apply_service_spec(cast(ServiceSpec, spec))
+ @handle_orch_error
+ def apply_tuned_profiles(self, specs: List[TunedProfileSpec]) -> str:
+ outs = []
+ for spec in specs:
+ self.tuned_profiles.add_profile(spec)
+ outs.append(f'Saved tuned profile {spec.profile_name}')
+ self._kick_serve_loop()
+ return '\n'.join(outs)
+
+ @handle_orch_error
+ def rm_tuned_profile(self, profile_name: str) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f'Tuned profile {profile_name} does not exist. Nothing to remove.')
+ self.tuned_profiles.rm_profile(profile_name)
+ self._kick_serve_loop()
+ return f'Removed tuned profile {profile_name}'
+
+ @handle_orch_error
+ def tuned_profile_ls(self) -> List[TunedProfileSpec]:
+ return self.tuned_profiles.list_profiles()
+
+ @handle_orch_error
+ def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f'Tuned profile {profile_name} does not exist. Cannot add setting.')
+ self.tuned_profiles.add_setting(profile_name, setting, value)
+ self._kick_serve_loop()
+ return f'Added setting {setting} with value {value} to tuned profile {profile_name}'
+
+ @handle_orch_error
+ def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f'Tuned profile {profile_name} does not exist. Cannot remove setting.')
+ self.tuned_profiles.rm_setting(profile_name, setting)
+ self._kick_serve_loop()
+ return f'Removed setting {setting} from tuned profile {profile_name}'
+
def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
self.health_checks[name] = {
'severity': 'warning',
len(self.mgr.apply_spec_fails),
warnings)
self.mgr.update_watched_hosts()
+ self.mgr.tuned_profile_utils._write_all_tuned_profiles()
return r
def _apply_service_config(self, spec: ServiceSpec) -> None:
mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
mock.patch('cephadm.agent.CherryPyThread.run'), \
- mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
+ mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \
+ mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None:
--- /dev/null
+import logging
+from typing import Dict, List, TYPE_CHECKING
+from ceph.utils import datetime_now
+from .schedule import HostAssignment
+from ceph.deployment.service_spec import ServiceSpec, TunedProfileSpec
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+SYSCTL_DIR = '/etc/sysctl.d'
+
+
+class TunedProfileUtils():
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+
+ def _profile_to_str(self, p: TunedProfileSpec) -> str:
+ p_str = f'# created by cephadm\n# tuned profile "{p.profile_name}"\n\n'
+ for k, v in p.settings.items():
+ p_str += f'{k} = {v}\n'
+ return p_str
+
+ def _write_all_tuned_profiles(self) -> None:
+ host_profile_mapping: Dict[str, List[Dict[str, str]]] = {}
+ for host in self.mgr.cache.get_hosts():
+ host_profile_mapping[host] = []
+
+ for profile in self.mgr.tuned_profiles.list_profiles():
+ p_str = self._profile_to_str(profile)
+ ha = HostAssignment(
+ spec=ServiceSpec(
+ 'crash', placement=profile.placement),
+ hosts=self.mgr.cache.get_schedulable_hosts(),
+ unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+ daemons=[],
+ networks=self.mgr.cache.networks,
+ )
+ all_slots, _, _ = ha.place()
+ for host in {s.hostname for s in all_slots}:
+ host_profile_mapping[host].append({profile.profile_name: p_str})
+
+ for host, profiles in host_profile_mapping.items():
+ self._remove_stray_tuned_profiles(host, profiles)
+ self._write_tuned_profiles(host, profiles)
+
+ def _remove_stray_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None:
+ cmd = ['ls', SYSCTL_DIR]
+ found_files = self.mgr.ssh.check_execute_command(host, cmd).split('\n')
+ found_files = [s.strip() for s in found_files]
+ updated = False
+ for file in found_files:
+ if '-cephadm-tuned-profile.conf' not in file:
+ continue
+ if not any(file.split('-')[0] in p.keys() for p in profiles):
+ logger.info(f'Removing stray tuned profile file {file}')
+ cmd = ['rm', '-f', f'{SYSCTL_DIR}/{file}']
+ self.mgr.ssh.check_execute_command(host, cmd)
+ updated = True
+ if updated:
+ self.mgr.ssh.check_execute_command(host, ['sysctl', '--system'])
+
+ def _write_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None:
+ updated = False
+ for p in profiles:
+ for profile_name, content in p.items():
+ if self.mgr.cache.host_needs_tuned_profile_update(host, profile_name):
+ logger.info(f'Writing tuned profile {profile_name} to host {host}')
+ profile_filename: str = f'{SYSCTL_DIR}/{profile_name}-cephadm-tuned-profile.conf'
+ self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8'))
+ updated = True
+ if updated:
+ self.mgr.ssh.check_execute_command(host, ['sysctl', '--system'])
+ self.mgr.cache.last_tuned_profile_update[host] = datetime_now()
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
+ IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec, SpecValidationError
from ceph.utils import datetime_to_str, str_to_datetime
"""Update an existing snmp gateway service"""
raise NotImplementedError()
+ def apply_tuned_profiles(self, specs: List[TunedProfileSpec]) -> OrchResult[str]:
+ """Add or update an existing tuned profile"""
+ raise NotImplementedError()
+
+ def rm_tuned_profile(self, profile_name: str) -> OrchResult[str]:
+ """Remove a tuned profile"""
+ raise NotImplementedError()
+
+ def tuned_profile_ls(self) -> OrchResult[List[TunedProfileSpec]]:
+ """See current tuned profiles"""
+ raise NotImplementedError()
+
+ def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> OrchResult[str]:
+ """Change/Add a specific setting for a tuned profile"""
+ raise NotImplementedError()
+
+ def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> OrchResult[str]:
+ """Remove a specific setting for a tuned profile"""
+ raise NotImplementedError()
+
def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, \
- GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec
+ GenericSpec, DaemonDescriptionStatus, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str:
output += f"\nHost Parallelism: {result['workers']}"
return HandleCommandResult(stdout=output)
+ @_cli_write_command('orch tuned-profile apply')
+ def _apply_tuned_profiles(self,
+ profile_name: Optional[str] = None,
+ placement: Optional[str] = None,
+ settings: Optional[str] = None,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Add or update a tuned profile"""
+ usage = """Usage:
+ ceph orch tuned-profile apply -i <yaml spec>
+ ceph orch tuned-profile apply <profile_name> [--placement=<placement_string>] [--settings='option=value,option2=value2']
+ """
+ if inbuf:
+ if profile_name or placement or settings:
+ raise OrchestratorValidationError(usage)
+ yaml_objs: Iterator = yaml.safe_load_all(inbuf)
+ specs: List[TunedProfileSpec] = []
+ # YAML '---' document separator with no content generates
+ # None entries in the output. Let's skip them silently.
+ content = [o for o in yaml_objs if o is not None]
+ for spec in content:
+ specs.append(TunedProfileSpec.from_json(spec))
+ else:
+ if not profile_name:
+ raise OrchestratorValidationError(usage)
+ placement_spec = PlacementSpec.from_string(
+ placement) if placement else PlacementSpec(host_pattern='*')
+ settings_dict = {}
+ if settings:
+ settings_list = settings.split(',')
+ for setting in settings_list:
+ if '=' not in setting:
+ raise SpecValidationError('settings defined on cli for tuned profile must '
+ + 'be of format "setting_name=value,setting_name2=value2" etc.')
+ name, value = setting.split('=', 1)
+ settings_dict[name.strip()] = value.strip()
+ tuned_profile_spec = TunedProfileSpec(
+ profile_name=profile_name, placement=placement_spec, settings=settings_dict)
+ specs = [tuned_profile_spec]
+ completion = self.apply_tuned_profiles(specs)
+ res = raise_if_exception(completion)
+ return HandleCommandResult(stdout=res)
+
+ @_cli_write_command('orch tuned-profile rm')
+ def _rm_tuned_profiles(self, profile_name: str) -> HandleCommandResult:
+ completion = self.rm_tuned_profile(profile_name)
+ res = raise_if_exception(completion)
+ return HandleCommandResult(stdout=res)
+
+ @_cli_read_command('orch tuned-profile ls')
+ def _tuned_profile_ls(self, format: Format = Format.plain) -> HandleCommandResult:
+ completion = self.tuned_profile_ls()
+ profiles: List[TunedProfileSpec] = raise_if_exception(completion)
+ if format != Format.plain:
+ return HandleCommandResult(stdout=to_format(profiles, format, many=True, cls=TunedProfileSpec))
+ else:
+ out = ''
+ for profile in profiles:
+ out += f'profile_name: {profile.profile_name}\n'
+ out += f'placement: {profile.placement.pretty_str()}\n'
+ out += 'settings:\n'
+ for k, v in profile.settings.items():
+ out += f' {k}: {v}\n'
+ out += '---\n'
+ return HandleCommandResult(stdout=out)
+
+ @_cli_write_command('orch tuned-profile add-setting')
+ def _tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> HandleCommandResult:
+ completion = self.tuned_profile_add_setting(profile_name, setting, value)
+ res = raise_if_exception(completion)
+ return HandleCommandResult(stdout=res)
+
+ @_cli_write_command('orch tuned-profile rm-setting')
+ def _tuned_profile_rm_setting(self, profile_name: str, setting: str) -> HandleCommandResult:
+ completion = self.tuned_profile_rm_setting(profile_name, setting)
+ res = raise_if_exception(completion)
+ return HandleCommandResult(stdout=res)
+
def self_test(self) -> None:
old_orch = self._select_orchestrator()
self._set_backend('')
yaml.add_representer(MDSSpec, ServiceSpec.yaml_representer)
+
+
+class TunedProfileSpec():
+ def __init__(self,
+ profile_name: str,
+ placement: Optional[PlacementSpec] = None,
+ settings: Optional[Dict[str, str]] = None,
+ ):
+ self.profile_name = profile_name
+ self.placement = placement or PlacementSpec(host_pattern='*')
+ self.settings = settings or {}
+ self._last_updated: str = ''
+
+ @classmethod
+ def from_json(cls, spec: Dict[str, Any]) -> 'TunedProfileSpec':
+ data = {}
+ if 'profile_name' not in spec:
+ raise SpecValidationError('Tuned profile spec must include "profile_name" field')
+ data['profile_name'] = spec['profile_name']
+ if not isinstance(data['profile_name'], str):
+ raise SpecValidationError('"profile_name" field must be a string')
+ if 'placement' in spec:
+ data['placement'] = PlacementSpec.from_json(spec['placement'])
+ if 'settings' in spec:
+ data['settings'] = spec['settings']
+ return cls(**data)
+
+ def to_json(self) -> Dict[str, Any]:
+ res: Dict[str, Any] = {}
+ res['profile_name'] = self.profile_name
+ res['placement'] = self.placement.to_json()
+ res['settings'] = self.settings
+ return res
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, TunedProfileSpec):
+ if (
+ self.placement == other.placement
+ and self.profile_name == other.profile_name
+ and self.settings == other.settings
+ ):
+ return True
+ return False
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return f'TunedProfile({self.profile_name})'
+
+ def copy(self) -> 'TunedProfileSpec':
+ # for making deep copies so you can edit the settings in one without affecting the other
+ # mostly for testing purposes
+ return TunedProfileSpec(self.profile_name, self.placement, self.settings.copy())