]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Add migration to the new scheduler.
authorSebastian Wagner <sebastian.wagner@suse.com>
Wed, 6 May 2020 10:50:18 +0000 (12:50 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Mon, 15 Jun 2020 10:42:17 +0000 (12:42 +0200)
New scheduler that takes PlacementSpec as the bound and not as recommendation.

Which means, we have to make sure, we're not removing any daemons directly after
upgrading to the new scheduler.

There is a potential race here:

1. user updates his spec to remove daemons
2. mgr gets upgrades to new scheduler, before the old scheduler removed the daemon
3. now, we're converting the spec to explicit placement, thus reverting (1.)

I think this is ok.

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit 074745217703ab57692fe8234a82130dc49179d6)

src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/migrations.py [new file with mode: 0644]
src/pybind/mgr/cephadm/module.py

index e107ee4abb46586fe1b248c67941a315e323b766..05510654b825a327103d618d6b10f53c53047ac2 100644 (file)
@@ -427,4 +427,15 @@ class HostCache():
     def rm_daemon(self, host, name):
         if host in self.daemons:
             if name in self.daemons[host]:
-                del self.daemons[host][name]
\ No newline at end of file
+                del self.daemons[host][name]
+
+    def daemon_cache_filled(self):
+        """
+        i.e. we have checked the daemons for each hosts at least once.
+        excluding offline hosts.
+
+        We're not checking for `host_needs_daemon_refresh`, as this might never be
+        False for all hosts.
+        """
+        return all((h in self.last_daemon_update or h in self.mgr.offline_hosts)
+                   for h in self.get_hosts())
diff --git a/src/pybind/mgr/cephadm/migrations.py b/src/pybind/mgr/cephadm/migrations.py
new file mode 100644 (file)
index 0000000..0c964c7
--- /dev/null
@@ -0,0 +1,128 @@
+import logging
+from typing import TYPE_CHECKING, Iterator
+
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
+from cephadm.schedule import HostAssignment
+
+from orchestrator import OrchestratorError
+
+if TYPE_CHECKING:
+    from .module import CephadmOrchestrator
+
+LAST_MIGRATION = 1
+
+logger = logging.getLogger(__name__)
+
+
+class Migrations:
+    def __init__(self, mgr: "CephadmOrchestrator"):
+        self.mgr = mgr
+
+        # Why having a global counter, instead of spec versions?
+        #
+        # for the first migration:
+        # The specs don't change in (this) migration. but the scheduler here.
+        # Adding the version to the specs at this time just felt wrong to me.
+        #
+        # And the specs are only another part of cephadm which needs potential upgrades.
+        # We have the cache, the inventory, the config store, the upgrade (imagine changing the
+        # upgrade code, while an old upgrade is still in progress), naming of daemons,
+        # fs-layout of the daemons, etc.
+        if self.mgr.migration_current is None:
+            self.set(0)
+
+        # for some migrations, we don't need to do anything except for
+        # setting migration_current = 1.
+        # let's try to shortcut things here.
+        self.migrate()
+
+    def set(self, val):
+        self.mgr.set_module_option('migration_current', val)
+        self.mgr.migration_current = val
+
+    def is_migration_ongoing(self):
+        return self.mgr.migration_current != LAST_MIGRATION
+
+    def verify_no_migration(self):
+        if self.is_migration_ongoing():
+            # this is raised in module.serve()
+            raise OrchestratorError("cephadm migration still ongoing. Please wait, until the migration is complete.")
+
+    def migrate(self):
+        if self.mgr.migration_current == 0:
+            if self.migrate_0_1():
+                self.set(1)
+
+    def migrate_0_1(self) -> bool:
+        """
+        Migration 0 -> 1
+        New scheduler that takes PlacementSpec as the bound and not as recommendation.
+        I.e. the new scheduler won't suggest any new placements outside of the hosts
+        specified by label etc.
+
+        Which means, we have to make sure, we're not removing any daemons directly after
+        upgrading to the new scheduler.
+
+        There is a potential race here:
+        1. user updates his spec to remove daemons
+        2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon
+        3. now, we're converting the spec to explicit placement, thus reverting (1.)
+        I think this is ok.
+        """
+
+        def interesting_specs() -> Iterator[ServiceSpec]:
+            for s in self.mgr.spec_store.specs.values():
+                if s.unmanaged:
+                    continue
+                p = s.placement
+                if p is None:
+                    continue
+                if p.count is None:
+                    continue
+                if not p.hosts and not p.host_pattern and not p.label:
+                    continue
+                yield s
+
+        def convert_to_explicit(spec: ServiceSpec) -> None:
+            placements = HostAssignment(
+                spec=spec,
+                get_hosts_func=self.mgr._get_hosts,
+                get_daemons_func=self.mgr.cache.get_daemons_by_service
+            ).place()
+
+            existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
+
+            # We have to migrate, only if the new scheduler would remove daemons
+            if len(placements) >= len(existing_daemons):
+                return
+
+            old_hosts = {h.hostname: h for h in spec.placement.hosts}
+            new_hosts = [
+                old_hosts[d.hostname] if d.hostname in old_hosts else HostPlacementSpec(hostname=d.hostname, network='', name='')
+                for d in existing_daemons
+            ]
+
+            new_placement = PlacementSpec(
+                hosts=new_hosts,
+                count=spec.placement.count
+            )
+
+            new_spec = ServiceSpec.from_json(spec.to_json())
+            new_spec.placement = new_placement
+
+            logger.info(f"Migrating {spec.one_line_str()} to explicit placement")
+
+            self.mgr.spec_store.save(new_spec)
+
+        specs = list(interesting_specs())
+        if not specs:
+            return True  # nothing to do. shortcut
+
+        if not self.mgr.cache.daemon_cache_filled():
+            logger.info("Unable to migrate yet. Daemon Cache still incomplete.")
+            return False
+
+        for spec in specs:
+            convert_to_explicit(spec)
+
+        return True
index 318e5de471891ebb8a9338481d73a24517e6a564..f862a70e366a9b1a6e4da97e1ae4c3b4d08afb0d 100644 (file)
@@ -32,6 +32,7 @@ from orchestrator._interface import GenericSpec
 
 from . import remotes
 from . import utils
+from .migrations import Migrations
 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
     RbdMirrorService, CrashService, CephadmService
 from .services.iscsi import IscsiService
@@ -239,6 +240,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
             'desc': 'location of alerts to include in prometheus deployments',
         },
+        {
+            'name': 'migration_current',
+            'type': 'int',
+            'default': None,
+            'desc': 'internal - do not modify',
+            # used to track track spec and other data migrations.
+        },
     ]
 
     def __init__(self, *args, **kwargs):
@@ -271,6 +279,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self.warn_on_failed_host_check = True
             self.allow_ptrace = False
             self.prometheus_alerts_path = ''
+            self.migration_current = None
 
         self._cons = {}  # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
 
@@ -316,6 +325,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         # in-memory only.
         self.offline_hosts: Set[str] = set()
 
+        self.migration = Migrations(self)
+
         # services:
         self.osd_service = OSDService(self)
         self.nfs_service = NFSService(self)
@@ -484,6 +495,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
                 self.rm_util._remove_osds_bg()
 
+                self.migration.migrate()
+                if self.migration.is_migration_ongoing():
+                    continue
+
                 if self._apply_all_services():
                     continue  # did something, refresh
 
@@ -1982,6 +1997,8 @@ you may want to run:
         return self._add_daemon('mgr', spec, self.mgr_service.create)
 
     def _apply(self, spec: GenericSpec) -> str:
+        self.migration.verify_no_migration()
+
         if spec.service_type == 'host':
             return self._add_host(cast(HostSpec, spec))