]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: move _apply_service to serve.py
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 11 Sep 2020 11:26:46 +0000 (13:26 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 18 Nov 2020 10:52:17 +0000 (11:52 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit 6b2664a9e3dd531d99b5cc7f01384298e0bdc055)

src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index da93918d5b6de666606a1bbad8d5abb0d932870d..1672f6abc87b3592313c5a92dc1cad16b1b08fb9 100644 (file)
@@ -47,7 +47,7 @@ from .services.nfs import NFSService
 from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
     NodeExporterService
-from .schedule import HostAssignment, HostPlacementSpec
+from .schedule import HostAssignment
 from .inventory import Inventory, SpecStore, HostCache, EventStore
 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
 from .template import TemplateMgr
@@ -1839,147 +1839,6 @@ To check that the host is reachable:
 
             return "Removed {} from host '{}'".format(name, host)
 
-    def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
-        fn = {
-            'mds': self.mds_service.config,
-            'rgw': self.rgw_service.config,
-            'nfs': self.nfs_service.config,
-            'iscsi': self.iscsi_service.config,
-        }.get(service_type)
-        return cast(Callable[[ServiceSpec], None], fn)
-
-    def _apply_service(self, spec: ServiceSpec) -> bool:
-        """
-        Schedule a service.  Deploy new daemons or remove old ones, depending
-        on the target label and count specified in the placement.
-        """
-        self.migration.verify_no_migration()
-
-        daemon_type = spec.service_type
-        service_name = spec.service_name()
-        if spec.unmanaged:
-            self.log.debug('Skipping unmanaged service %s' % service_name)
-            return False
-        if spec.preview_only:
-            self.log.debug('Skipping preview_only service %s' % service_name)
-            return False
-        self.log.debug('Applying service %s spec' % service_name)
-
-        config_func = self._config_fn(daemon_type)
-
-        if daemon_type == 'osd':
-            self.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
-            # TODO: return True would result in a busy loop
-            # can't know if daemon count changed; create_from_spec doesn't
-            # return a solid indication
-            return False
-
-        daemons = self.cache.get_daemons_by_service(service_name)
-
-        public_network = None
-        if daemon_type == 'mon':
-            ret, out, err = self.check_mon_command({
-                'prefix': 'config get',
-                'who': 'mon',
-                'key': 'public_network',
-            })
-            if '/' in out:
-                public_network = out.strip()
-                self.log.debug('mon public_network is %s' % public_network)
-
-        def matches_network(host):
-            # type: (str) -> bool
-            if not public_network:
-                return False
-            # make sure we have 1 or more IPs for that network on that
-            # host
-            return len(self.cache.networks[host].get(public_network, [])) > 0
-
-        ha = HostAssignment(
-            spec=spec,
-            hosts=self._hosts_with_daemon_inventory(),
-            get_daemons_func=self.cache.get_daemons_by_service,
-            filter_new_host=matches_network if daemon_type == 'mon' else None,
-        )
-
-        hosts: List[HostPlacementSpec] = ha.place()
-        self.log.debug('Usable hosts: %s' % hosts)
-
-        r = None
-
-        # sanity check
-        if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
-            self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
-            return False
-
-        # add any?
-        did_config = False
-
-        add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
-        self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
-
-        remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
-        self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
-
-        for host, network, name in add_daemon_hosts:
-            daemon_id = self.get_unique_name(daemon_type, host, daemons,
-                                             prefix=spec.service_id,
-                                             forcename=name)
-
-            if not did_config and config_func:
-                if daemon_type == 'rgw':
-                    rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
-                    rgw_config_func(cast(RGWSpec, spec), daemon_id)
-                else:
-                    config_func(spec)
-                did_config = True
-
-            daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
-                host, daemon_id, network, spec)
-            self.log.debug('Placing %s.%s on host %s' % (
-                daemon_type, daemon_id, host))
-
-            try:
-                daemon_spec = self.cephadm_services[daemon_type].prepare_create(daemon_spec)
-                self._create_daemon(daemon_spec)
-                r = True
-            except (RuntimeError, OrchestratorError) as e:
-                self.events.for_service(spec, 'ERROR',
-                    f"Failed while placing {daemon_type}.{daemon_id}"
-                    "on {host}: {e}")
-                # only return "no change" if no one else has already succeeded.
-                # later successes will also change to True
-                if r is None:
-                    r = False
-                continue
-
-            # add to daemon list so next name(s) will also be unique
-            sd = orchestrator.DaemonDescription(
-                hostname=host,
-                daemon_type=daemon_type,
-                daemon_id=daemon_id,
-            )
-            daemons.append(sd)
-
-        # remove any?
-        def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
-            daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
-            r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
-            return not r.retval
-
-        while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
-            # let's find a subset that is ok-to-stop
-            remove_daemon_hosts.pop()
-        for d in remove_daemon_hosts:
-            r = True
-            # NOTE: we are passing the 'force' flag here, which means
-            # we can delete a mon instances data.
-            self._remove_daemon(d.name(), d.hostname)
-
-        if r is None:
-            r = False
-        return r
-
     def _check_pool_exists(self, pool, service_name):
         logger.info(f'Checking pool "{pool}" exists for service {service_name}')
         if not self.rados.pool_exists(pool):
index f00e61f2167a45007d4f9653dd73efd65dce7019..186ba06782322c0b9560bff9ba777dbaa14e053b 100644 (file)
@@ -2,7 +2,7 @@ import datetime
 import json
 import logging
 from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List
+from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set
 
 try:
     import remoto
@@ -10,9 +10,11 @@ except ImportError:
     remoto = None
 
 from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec
 
 import orchestrator
+from cephadm.schedule import HostAssignment
 from cephadm.utils import forall_hosts, cephadmNoImage, str_to_datetime
 from orchestrator import OrchestratorError
 
@@ -335,7 +337,8 @@ class CephadmServe:
                     name = '%s.%s' % (s.get('type'), s.get('id'))
                     if s.get('type') == 'rbd-mirror':
                         defaults = defaultdict(lambda: None, {'id': None})
-                        metadata = self.mgr.get_metadata("rbd-mirror", s.get('id'), default=defaults)
+                        metadata = self.mgr.get_metadata(
+                            "rbd-mirror", s.get('id'), default=defaults)
                         if metadata['id']:
                             name = '%s.%s' % (s.get('type'), metadata['id'])
                         else:
@@ -378,7 +381,7 @@ class CephadmServe:
             specs.append(spec)
         for spec in specs:
             try:
-                if self.mgr._apply_service(spec):
+                if self._apply_service(spec):
                     r = True
             except Exception as e:
                 self.log.exception('Failed to apply %s spec %s: %s' % (
@@ -386,3 +389,144 @@ class CephadmServe:
                 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
 
         return r
+
+    def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
+        fn = {
+            'mds': self.mgr.mds_service.config,
+            'rgw': self.mgr.rgw_service.config,
+            'nfs': self.mgr.nfs_service.config,
+            'iscsi': self.mgr.iscsi_service.config,
+        }.get(service_type)
+        return cast(Callable[[ServiceSpec], None], fn)
+
+    def _apply_service(self, spec: ServiceSpec) -> bool:
+        """
+        Schedule a service.  Deploy new daemons or remove old ones, depending
+        on the target label and count specified in the placement.
+        """
+        self.mgr.migration.verify_no_migration()
+
+        daemon_type = spec.service_type
+        service_name = spec.service_name()
+        if spec.unmanaged:
+            self.log.debug('Skipping unmanaged service %s' % service_name)
+            return False
+        if spec.preview_only:
+            self.log.debug('Skipping preview_only service %s' % service_name)
+            return False
+        self.log.debug('Applying service %s spec' % service_name)
+
+        config_func = self._config_fn(daemon_type)
+
+        if daemon_type == 'osd':
+            self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
+            # TODO: return True would result in a busy loop
+            # can't know if daemon count changed; create_from_spec doesn't
+            # return a solid indication
+            return False
+
+        daemons = self.mgr.cache.get_daemons_by_service(service_name)
+
+        public_network = None
+        if daemon_type == 'mon':
+            ret, out, err = self.mgr.check_mon_command({
+                'prefix': 'config get',
+                'who': 'mon',
+                'key': 'public_network',
+            })
+            if '/' in out:
+                public_network = out.strip()
+                self.log.debug('mon public_network is %s' % public_network)
+
+        def matches_network(host):
+            # type: (str) -> bool
+            if not public_network:
+                return False
+            # make sure we have 1 or more IPs for that network on that
+            # host
+            return len(self.mgr.cache.networks[host].get(public_network, [])) > 0
+
+        ha = HostAssignment(
+            spec=spec,
+            hosts=self.mgr._hosts_with_daemon_inventory(),
+            get_daemons_func=self.mgr.cache.get_daemons_by_service,
+            filter_new_host=matches_network if daemon_type == 'mon' else None,
+        )
+
+        hosts: List[HostPlacementSpec] = ha.place()
+        self.log.debug('Usable hosts: %s' % hosts)
+
+        r = None
+
+        # sanity check
+        if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
+            self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
+            return False
+
+        # add any?
+        did_config = False
+
+        add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
+        self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
+
+        remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
+        self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
+
+        for host, network, name in add_daemon_hosts:
+            daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons,
+                                                 prefix=spec.service_id,
+                                                 forcename=name)
+
+            if not did_config and config_func:
+                if daemon_type == 'rgw':
+                    rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
+                    rgw_config_func(cast(RGWSpec, spec), daemon_id)
+                else:
+                    config_func(spec)
+                did_config = True
+
+            daemon_spec = self.mgr.cephadm_services[daemon_type].make_daemon_spec(
+                host, daemon_id, network, spec)
+            self.log.debug('Placing %s.%s on host %s' % (
+                daemon_type, daemon_id, host))
+
+            try:
+                daemon_spec = self.mgr.cephadm_services[daemon_type].prepare_create(daemon_spec)
+                self.mgr._create_daemon(daemon_spec)
+                r = True
+            except (RuntimeError, OrchestratorError) as e:
+                self.mgr.events.for_service(spec, 'ERROR',
+                                            f"Failed while placing {daemon_type}.{daemon_id}"
+                                            "on {host}: {e}")
+                # only return "no change" if no one else has already succeeded.
+                # later successes will also change to True
+                if r is None:
+                    r = False
+                continue
+
+            # add to daemon list so next name(s) will also be unique
+            sd = orchestrator.DaemonDescription(
+                hostname=host,
+                daemon_type=daemon_type,
+                daemon_id=daemon_id,
+            )
+            daemons.append(sd)
+
+        # remove any?
+        def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
+            daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
+            r = self.mgr.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
+            return not r.retval
+
+        while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
+            # let's find a subset that is ok-to-stop
+            remove_daemon_hosts.pop()
+        for d in remove_daemon_hosts:
+            r = True
+            # NOTE: we are passing the 'force' flag here, which means
+            # we can delete a mon instances data.
+            self.mgr._remove_daemon(d.name(), d.hostname)
+
+        if r is None:
+            r = False
+        return r
index 143ce9206df4602250a805fd6b6401bff1c8149e..0fda71c7799a72a81254fdf9c253c3e8ac532c00 100644 (file)
@@ -324,7 +324,7 @@ class TestCephadm(object):
     def test_mgr_update(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
-            r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
+            r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
             assert r
 
             assert_rm_daemon(cephadm_module, 'mgr.a', 'test')
@@ -527,7 +527,7 @@ class TestCephadm(object):
                 match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
 
                 ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
-                r = cephadm_module._apply_service(
+                r = CephadmServe(cephadm_module)._apply_service(
                     RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
                 assert r