From: Sebastian Wagner Date: Fri, 11 Sep 2020 11:26:46 +0000 (+0200) Subject: mgr/cephadm: move _apply_service to serve.py X-Git-Tag: v15.2.8~14^2~23 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3bffddb6411d5d7a690c2199706f99655035bd2f;p=ceph.git mgr/cephadm: move _apply_service to serve.py Signed-off-by: Sebastian Wagner (cherry picked from commit 6b2664a9e3dd531d99b5cc7f01384298e0bdc055) --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index da93918d5b6d..1672f6abc87b 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -47,7 +47,7 @@ from .services.nfs import NFSService from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService -from .schedule import HostAssignment, HostPlacementSpec +from .schedule import HostAssignment from .inventory import Inventory, SpecStore, HostCache, EventStore from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade from .template import TemplateMgr @@ -1839,147 +1839,6 @@ To check that the host is reachable: return "Removed {} from host '{}'".format(name, host) - def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]: - fn = { - 'mds': self.mds_service.config, - 'rgw': self.rgw_service.config, - 'nfs': self.nfs_service.config, - 'iscsi': self.iscsi_service.config, - }.get(service_type) - return cast(Callable[[ServiceSpec], None], fn) - - def _apply_service(self, spec: ServiceSpec) -> bool: - """ - Schedule a service. Deploy new daemons or remove old ones, depending - on the target label and count specified in the placement. - """ - self.migration.verify_no_migration() - - daemon_type = spec.service_type - service_name = spec.service_name() - if spec.unmanaged: - self.log.debug('Skipping unmanaged service %s' % service_name) - return False - if spec.preview_only: - self.log.debug('Skipping preview_only service %s' % service_name) - return False - self.log.debug('Applying service %s spec' % service_name) - - config_func = self._config_fn(daemon_type) - - if daemon_type == 'osd': - self.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) - # TODO: return True would result in a busy loop - # can't know if daemon count changed; create_from_spec doesn't - # return a solid indication - return False - - daemons = self.cache.get_daemons_by_service(service_name) - - public_network = None - if daemon_type == 'mon': - ret, out, err = self.check_mon_command({ - 'prefix': 'config get', - 'who': 'mon', - 'key': 'public_network', - }) - if '/' in out: - public_network = out.strip() - self.log.debug('mon public_network is %s' % public_network) - - def matches_network(host): - # type: (str) -> bool - if not public_network: - return False - # make sure we have 1 or more IPs for that network on that - # host - return len(self.cache.networks[host].get(public_network, [])) > 0 - - ha = HostAssignment( - spec=spec, - hosts=self._hosts_with_daemon_inventory(), - get_daemons_func=self.cache.get_daemons_by_service, - filter_new_host=matches_network if daemon_type == 'mon' else None, - ) - - hosts: List[HostPlacementSpec] = ha.place() - self.log.debug('Usable hosts: %s' % hosts) - - r = None - - # sanity check - if daemon_type in ['mon', 'mgr'] and len(hosts) < 1: - self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts) - return False - - # add any? - did_config = False - - add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts) - self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts) - - remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts) - self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts) - - for host, network, name in add_daemon_hosts: - daemon_id = self.get_unique_name(daemon_type, host, daemons, - prefix=spec.service_id, - forcename=name) - - if not did_config and config_func: - if daemon_type == 'rgw': - rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func) - rgw_config_func(cast(RGWSpec, spec), daemon_id) - else: - config_func(spec) - did_config = True - - daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec( - host, daemon_id, network, spec) - self.log.debug('Placing %s.%s on host %s' % ( - daemon_type, daemon_id, host)) - - try: - daemon_spec = self.cephadm_services[daemon_type].prepare_create(daemon_spec) - self._create_daemon(daemon_spec) - r = True - except (RuntimeError, OrchestratorError) as e: - self.events.for_service(spec, 'ERROR', - f"Failed while placing {daemon_type}.{daemon_id}" - "on {host}: {e}") - # only return "no change" if no one else has already succeeded. - # later successes will also change to True - if r is None: - r = False - continue - - # add to daemon list so next name(s) will also be unique - sd = orchestrator.DaemonDescription( - hostname=host, - daemon_type=daemon_type, - daemon_id=daemon_id, - ) - daemons.append(sd) - - # remove any? - def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool: - daemon_ids = [d.daemon_id for d in remove_daemon_hosts] - r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids) - return not r.retval - - while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts): - # let's find a subset that is ok-to-stop - remove_daemon_hosts.pop() - for d in remove_daemon_hosts: - r = True - # NOTE: we are passing the 'force' flag here, which means - # we can delete a mon instances data. - self._remove_daemon(d.name(), d.hostname) - - if r is None: - r = False - return r - def _check_pool_exists(self, pool, service_name): logger.info(f'Checking pool "{pool}" exists for service {service_name}') if not self.rados.pool_exists(pool): diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index f00e61f2167a..186ba0678232 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -2,7 +2,7 @@ import datetime import json import logging from collections import defaultdict -from typing import TYPE_CHECKING, Optional, List +from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set try: import remoto @@ -10,9 +10,11 @@ except ImportError: remoto = None from ceph.deployment import inventory -from ceph.deployment.service_spec import ServiceSpec +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec import orchestrator +from cephadm.schedule import HostAssignment from cephadm.utils import forall_hosts, cephadmNoImage, str_to_datetime from orchestrator import OrchestratorError @@ -335,7 +337,8 @@ class CephadmServe: name = '%s.%s' % (s.get('type'), s.get('id')) if s.get('type') == 'rbd-mirror': defaults = defaultdict(lambda: None, {'id': None}) - metadata = self.mgr.get_metadata("rbd-mirror", s.get('id'), default=defaults) + metadata = self.mgr.get_metadata( + "rbd-mirror", s.get('id'), default=defaults) if metadata['id']: name = '%s.%s' % (s.get('type'), metadata['id']) else: @@ -378,7 +381,7 @@ class CephadmServe: specs.append(spec) for spec in specs: try: - if self.mgr._apply_service(spec): + if self._apply_service(spec): r = True except Exception as e: self.log.exception('Failed to apply %s spec %s: %s' % ( @@ -386,3 +389,144 @@ class CephadmServe: self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) return r + + def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]: + fn = { + 'mds': self.mgr.mds_service.config, + 'rgw': self.mgr.rgw_service.config, + 'nfs': self.mgr.nfs_service.config, + 'iscsi': self.mgr.iscsi_service.config, + }.get(service_type) + return cast(Callable[[ServiceSpec], None], fn) + + def _apply_service(self, spec: ServiceSpec) -> bool: + """ + Schedule a service. Deploy new daemons or remove old ones, depending + on the target label and count specified in the placement. + """ + self.mgr.migration.verify_no_migration() + + daemon_type = spec.service_type + service_name = spec.service_name() + if spec.unmanaged: + self.log.debug('Skipping unmanaged service %s' % service_name) + return False + if spec.preview_only: + self.log.debug('Skipping preview_only service %s' % service_name) + return False + self.log.debug('Applying service %s spec' % service_name) + + config_func = self._config_fn(daemon_type) + + if daemon_type == 'osd': + self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) + # TODO: return True would result in a busy loop + # can't know if daemon count changed; create_from_spec doesn't + # return a solid indication + return False + + daemons = self.mgr.cache.get_daemons_by_service(service_name) + + public_network = None + if daemon_type == 'mon': + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'config get', + 'who': 'mon', + 'key': 'public_network', + }) + if '/' in out: + public_network = out.strip() + self.log.debug('mon public_network is %s' % public_network) + + def matches_network(host): + # type: (str) -> bool + if not public_network: + return False + # make sure we have 1 or more IPs for that network on that + # host + return len(self.mgr.cache.networks[host].get(public_network, [])) > 0 + + ha = HostAssignment( + spec=spec, + hosts=self.mgr._hosts_with_daemon_inventory(), + get_daemons_func=self.mgr.cache.get_daemons_by_service, + filter_new_host=matches_network if daemon_type == 'mon' else None, + ) + + hosts: List[HostPlacementSpec] = ha.place() + self.log.debug('Usable hosts: %s' % hosts) + + r = None + + # sanity check + if daemon_type in ['mon', 'mgr'] and len(hosts) < 1: + self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts) + return False + + # add any? + did_config = False + + add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts) + self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts) + + remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts) + self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts) + + for host, network, name in add_daemon_hosts: + daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons, + prefix=spec.service_id, + forcename=name) + + if not did_config and config_func: + if daemon_type == 'rgw': + rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func) + rgw_config_func(cast(RGWSpec, spec), daemon_id) + else: + config_func(spec) + did_config = True + + daemon_spec = self.mgr.cephadm_services[daemon_type].make_daemon_spec( + host, daemon_id, network, spec) + self.log.debug('Placing %s.%s on host %s' % ( + daemon_type, daemon_id, host)) + + try: + daemon_spec = self.mgr.cephadm_services[daemon_type].prepare_create(daemon_spec) + self.mgr._create_daemon(daemon_spec) + r = True + except (RuntimeError, OrchestratorError) as e: + self.mgr.events.for_service(spec, 'ERROR', + f"Failed while placing {daemon_type}.{daemon_id}" + "on {host}: {e}") + # only return "no change" if no one else has already succeeded. + # later successes will also change to True + if r is None: + r = False + continue + + # add to daemon list so next name(s) will also be unique + sd = orchestrator.DaemonDescription( + hostname=host, + daemon_type=daemon_type, + daemon_id=daemon_id, + ) + daemons.append(sd) + + # remove any? + def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool: + daemon_ids = [d.daemon_id for d in remove_daemon_hosts] + r = self.mgr.cephadm_services[daemon_type].ok_to_stop(daemon_ids) + return not r.retval + + while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts): + # let's find a subset that is ok-to-stop + remove_daemon_hosts.pop() + for d in remove_daemon_hosts: + r = True + # NOTE: we are passing the 'force' flag here, which means + # we can delete a mon instances data. + self.mgr._remove_daemon(d.name(), d.hostname) + + if r is None: + r = False + return r diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 143ce9206df4..0fda71c7799a 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -324,7 +324,7 @@ class TestCephadm(object): def test_mgr_update(self, cephadm_module): with with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) - r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps)) + r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps)) assert r assert_rm_daemon(cephadm_module, 'mgr.a', 'test') @@ -527,7 +527,7 @@ class TestCephadm(object): match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'") ps = PlacementSpec(hosts=['host1', 'host2'], count=2) - r = cephadm_module._apply_service( + r = CephadmServe(cephadm_module)._apply_service( RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps)) assert r