]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: add new args and validation for staggered upgrade
authorAdam King <adking@redhat.com>
Wed, 30 Mar 2022 13:49:56 +0000 (09:49 -0400)
committerAdam King <adking@redhat.com>
Thu, 19 May 2022 21:24:56 +0000 (17:24 -0400)
Signed-off-by: Adam King <adking@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/upgrade.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py

index fdb6ca616c882b1da11f1b361305db2ebb0a5563..53e08335c25dab6413604db4fa03416b4343efca 100644 (file)
@@ -60,7 +60,7 @@ from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore,
 from .upgrade import CephadmUpgrade
 from .template import TemplateMgr
 from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
-    cephadmNoImage
+    cephadmNoImage, CEPH_UPGRADE_ORDER
 from .configchecks import CephadmConfigChecks
 from .offline_watcher import OfflineHostWatcher
 
@@ -2692,10 +2692,36 @@ Then run the following:
         return self.upgrade.upgrade_ls(image, tags, show_all_versions)
 
     @handle_orch_error
-    def upgrade_start(self, image: str, version: str) -> str:
+    def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
+                      services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
         if self.inventory.get_host_with_state("maintenance"):
             raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
-        return self.upgrade.upgrade_start(image, version)
+        if daemon_types is not None and services is not None:
+            raise OrchestratorError('--daemon-types and --services are mutually exclusive')
+        if daemon_types is not None:
+            for dtype in daemon_types:
+                if dtype not in CEPH_UPGRADE_ORDER:
+                    raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
+                                            f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
+        if services is not None:
+            for service in services:
+                if service not in self.spec_store:
+                    raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n'
+                                            f'Known services are: {self.spec_store.all_specs.keys()}')
+        hosts: Optional[List[str]] = None
+        if host_placement is not None:
+            all_hosts = list(self.inventory.all_specs())
+            placement = PlacementSpec.from_string(host_placement)
+            hosts = placement.filter_matching_hostspecs(all_hosts)
+            if not hosts:
+                raise OrchestratorError(
+                    f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
+
+        if limit is not None:
+            if limit < 1:
+                raise OrchestratorError(f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
+
+        return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
 
     @handle_orch_error
     def upgrade_pause(self) -> str:
index 1d959f3a6b006eb7a96c5b7c555111e53c12b4a8..c4e4f74b278d3ac366d3dcca0cd435a58aa67323 100644 (file)
@@ -8,7 +8,8 @@ import orchestrator
 from cephadm.registry import Registry
 from cephadm.serve import CephadmServe
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES
+from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \
+    MONITORING_STACK_TYPES, CEPH_TYPES, GATEWAY_TYPES
 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
 
 if TYPE_CHECKING:
@@ -239,7 +240,8 @@ class CephadmUpgrade:
             r["tags"] = sorted(ls)
         return r
 
-    def upgrade_start(self, image: str, version: str) -> str:
+    def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
+                      hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
         if self.mgr.mode != 'root':
             raise OrchestratorError('upgrade is not supported in %s mode' % (
                 self.mgr.mode))
@@ -252,6 +254,10 @@ class CephadmUpgrade:
             target_name = normalize_image_digest(image, self.mgr.default_registry)
         else:
             raise OrchestratorError('must specify either image or version')
+
+        if daemon_types is not None or services is not None or hosts is not None:
+            self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
+
         if self.upgrade_state:
             if self.upgrade_state._target_name != target_name:
                 raise OrchestratorError(
@@ -280,6 +286,91 @@ class CephadmUpgrade:
         self.mgr.event.set()
         return 'Initiating upgrade to %s' % (target_name)
 
+    def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None:
+        def _latest_type(dtypes: List[str]) -> str:
+            # [::-1] gives the list in reverse
+            for daemon_type in CEPH_UPGRADE_ORDER[::-1]:
+                if daemon_type in dtypes:
+                    return daemon_type
+            return ''
+
+        def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]:
+            # this function takes a list of daemon types and first finds the daemon
+            # type from that list that is latest in our upgrade order. Then, from
+            # that latest type, it filters the list of candidate daemons received
+            # for daemons with types earlier in the upgrade order than the latest
+            # type found earlier. That filtered list of daemons is returned. The
+            # purpose of this function is to help in finding daemons that must have
+            # already been upgraded for the given filtering parameters (--daemon-types,
+            # --services, --hosts) to be valid.
+            latest = _latest_type(dtypes)
+            if not latest:
+                return []
+            earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1]
+            earlier_types = [t for t in earlier_types if t not in dtypes]
+            return [d for d in candidates if d.daemon_type in earlier_types]
+
+        if self.upgrade_state:
+            raise OrchestratorError('Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
+        try:
+            target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(target_name))
+        except OrchestratorError as e:
+            raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
+        # what we need to do here is build a list of daemons that must already be upgraded
+        # in order for the user's selection of daemons to upgrade to be valid. for example,
+        # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
+        daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type not in MONITORING_STACK_TYPES]
+        err_msg_base = 'Cannot start upgrade. '
+        # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
+        dtypes = []
+        if daemon_types is not None:
+            dtypes = daemon_types
+            if hosts is not None:
+                dtypes = [_latest_type(dtypes)]
+                other_host_daemons = [
+                    d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+                daemons = _get_earlier_daemons(dtypes, other_host_daemons)
+            else:
+                daemons = _get_earlier_daemons(dtypes, daemons)
+            err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
+        elif services is not None:
+            # for our purposes here we can effectively convert our list of services into the
+            # set of daemon types the services contain. This works because we don't allow --services
+            # and --daemon-types at the same time and we only allow services of the same type
+            sspecs = [self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
+            stypes = list(set([s.service_type for s in sspecs]))
+            if len(stypes) != 1:
+                raise OrchestratorError('Doing upgrade by service only support services of one type at '
+                                        f'a time. Found service types: {stypes}')
+            for stype in stypes:
+                dtypes += orchestrator.service_to_daemon_types(stype)
+            dtypes = list(set(dtypes))
+            if hosts is not None:
+                other_host_daemons = [
+                    d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+                daemons = _get_earlier_daemons(dtypes, other_host_daemons)
+            else:
+                daemons = _get_earlier_daemons(dtypes, daemons)
+            err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
+        elif hosts is not None:
+            # hosts must be handled a bit differently. For this, we really need to find all the daemon types
+            # that reside on hosts in the list of hosts we will upgrade. Then take the type from
+            # that list that is latest in the upgrade order and check if any daemons on hosts not in the
+            # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
+            dtypes = list(set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
+            other_hosts_daemons = [d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+            daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons)
+            err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
+        need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests)
+        if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)):
+            # also report active mgr as needing to be upgraded. It is not included in the resulting list
+            # by default as it is treated special and handled via the need_upgrade_self bool
+            n1.insert(0, (self.mgr.mgr_service.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr')), True))
+        if n1 or n2:
+            raise OrchestratorError(f'{err_msg_base}Please first upgrade '
+                                    f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
+                                    f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
+
     def upgrade_pause(self) -> str:
         if not self.upgrade_state:
             raise OrchestratorError('No upgrade in progress')
index 214df05ee4bf62db0731da31c562bae2ca390ca4..6622372f87cb17ec9d3556aa0993b4a06ca41ac5 100644 (file)
@@ -674,7 +674,8 @@ class Orchestrator(object):
     def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
         raise NotImplementedError()
 
-    def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
+    def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+                      hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
         raise NotImplementedError()
 
     def upgrade_pause(self) -> OrchResult[str]:
index 96222a64a4311ac5f5cb6492bd383b6e2a42a0c9..3ce7fb907fb35ec78eedca45264c4e68ecc20d01 100644 (file)
@@ -1435,10 +1435,16 @@ Usage:
     def _upgrade_start(self,
                        image: Optional[str] = None,
                        _end_positional_: int = 0,
+                       daemon_types: Optional[str] = None,
+                       hosts: Optional[str] = None,
+                       services: Optional[str] = None,
+                       limit: Optional[int] = None,
                        ceph_version: Optional[str] = None) -> HandleCommandResult:
         """Initiate upgrade"""
         self._upgrade_check_image_name(image, ceph_version)
-        completion = self.upgrade_start(image, ceph_version)
+        dtypes = daemon_types.split(',') if daemon_types is not None else None
+        service_names = services.split(',') if services is not None else None
+        completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit)
         raise_if_exception(completion)
         return HandleCommandResult(stdout=completion.result_str())