]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: make use of new upgrade control parameters
authorAdam King <adking@redhat.com>
Fri, 1 Apr 2022 13:41:01 +0000 (09:41 -0400)
committerAdam King <adking@redhat.com>
Sat, 21 May 2022 22:11:21 +0000 (18:11 -0400)
Fixes: https://tracker.ceph.com/issues/54135
Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit c1f3497b43bff6f7640161807dce01dc089ce405)

Conflicts:
src/pybind/mgr/cephadm/upgrade.py

src/pybind/mgr/cephadm/upgrade.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py

index 70b09be1b9cb9701637124cec7a796571576ab0c..3cedae582442d75d43640beb16b47b01dd1010d5 100644 (file)
@@ -59,7 +59,12 @@ class UpgradeState:
                  error: Optional[str] = None,
                  paused: Optional[bool] = None,
                  fs_original_max_mds: Optional[Dict[str, int]] = None,
-                 fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None
+                 fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
+                 daemon_types: Optional[List[str]] = None,
+                 hosts: Optional[List[str]] = None,
+                 services: Optional[List[str]] = None,
+                 total_count: Optional[int] = None,
+                 remaining_count: Optional[int] = None,
                  ):
         self._target_name: str = target_name  # Use CephadmUpgrade.target_image instead.
         self.progress_id: str = progress_id
@@ -69,7 +74,13 @@ class UpgradeState:
         self.error: Optional[str] = error
         self.paused: bool = paused or False
         self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
-        self.fs_original_allow_standby_replay: Optional[Dict[str, bool]] = fs_original_allow_standby_replay
+        self.fs_original_allow_standby_replay: Optional[Dict[str,
+                                                             bool]] = fs_original_allow_standby_replay
+        self.daemon_types = daemon_types
+        self.hosts = hosts
+        self.services = services
+        self.total_count = total_count
+        self.remaining_count = remaining_count
 
     def to_json(self) -> dict:
         return {
@@ -82,6 +93,11 @@ class UpgradeState:
             'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
             'error': self.error,
             'paused': self.paused,
+            'daemon_types': self.daemon_types,
+            'hosts': self.hosts,
+            'services': self.services,
+            'total_count': self.total_count,
+            'remaining_count': self.remaining_count,
         }
 
     @classmethod
@@ -131,6 +147,23 @@ class CephadmUpgrade:
             r.target_image = self.target_image
             r.in_progress = True
             r.progress, r.services_complete = self._get_upgrade_info()
+
+            if self.upgrade_state.daemon_types is not None:
+                which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
+                if self.upgrade_state.hosts is not None:
+                    which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
+            elif self.upgrade_state.services is not None:
+                which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
+                if self.upgrade_state.hosts is not None:
+                    which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
+            elif self.upgrade_state.hosts is not None:
+                which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
+            else:
+                which_str = 'Upgrading all daemon types on all hosts'
+            if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
+                which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
+            r.which = which_str
+
             # accessing self.upgrade_info_str will throw an exception if it
             # has not been set in _do_upgrade yet
             try:
@@ -147,7 +180,7 @@ class CephadmUpgrade:
         if not self.upgrade_state or not self.upgrade_state.target_digests:
             return '', []
 
-        daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+        daemons = self._get_filtered_daemons()
 
         if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
             return '', []
@@ -162,6 +195,30 @@ class CephadmUpgrade:
 
         return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
 
+    def _get_filtered_daemons(self) -> List[DaemonDescription]:
+        # Return the set of daemons set to be upgraded with out current
+        # filtering parameters (or all daemons in upgrade order if no filtering
+        # parameter are set).
+        assert self.upgrade_state is not None
+        if self.upgrade_state.daemon_types is not None:
+            daemons = [d for d in self.mgr.cache.get_daemons(
+            ) if d.daemon_type in self.upgrade_state.daemon_types]
+        elif self.upgrade_state.services is not None:
+            daemons = []
+            for service in self.upgrade_state.services:
+                daemons += self.mgr.cache.get_daemons_by_service(service)
+        else:
+            daemons = [d for d in self.mgr.cache.get_daemons(
+            ) if d.daemon_type in CEPH_UPGRADE_ORDER]
+        if self.upgrade_state.hosts is not None:
+            daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
+        return daemons
+
+    def _get_current_version(self) -> Tuple[int, int, str]:
+        current_version = self.mgr.version.split('ceph version ')[1]
+        (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
+        return (int(current_major), int(current_minor), current_version)
+
     def _check_target_version(self, version: str) -> Optional[str]:
         try:
             (major, minor, _) = version.split('.', 2)
@@ -267,7 +324,12 @@ class CephadmUpgrade:
         self.mgr.log.info('Upgrade: Started with target %s' % target_name)
         self.upgrade_state = UpgradeState(
             target_name=target_name,
-            progress_id=str(uuid.uuid4())
+            progress_id=str(uuid.uuid4()),
+            daemon_types=daemon_types,
+            hosts=hosts,
+            services=services,
+            total_count=limit,
+            remaining_count=limit,
         )
         self._update_upgrade_progress(0.0)
         self._save_upgrade_state()
@@ -707,6 +769,9 @@ class CephadmUpgrade:
         if target_digests is None:
             target_digests = []
         for d_entry in to_upgrade:
+            if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
+                self.mgr.log.info(f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
+                return
             d = d_entry[0]
             assert d.daemon_type is not None
             assert d.daemon_id is not None
@@ -747,8 +812,7 @@ class CephadmUpgrade:
                 self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
 
             if len(to_upgrade) > 1:
-                logger.info('Upgrade: Updating %s.%s (%d/%d)' %
-                            (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
+                logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade), self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
             else:
                 logger.info('Upgrade: Updating %s.%s' %
                             (d.daemon_type, d.daemon_id))
@@ -771,6 +835,9 @@ class CephadmUpgrade:
                 })
                 return
             num += 1
+            if self.upgrade_state.remaining_count is not None and not d_entry[1]:
+                self.upgrade_state.remaining_count -= 1
+                self._save_upgrade_state()
 
     def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
         if need_upgrade_self:
@@ -866,6 +933,17 @@ class CephadmUpgrade:
             self.upgrade_state.fs_original_allow_standby_replay = {}
             self._save_upgrade_state()
 
+    def _mark_upgrade_complete(self) -> None:
+        if not self.upgrade_state:
+            logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
+            return
+        logger.info('Upgrade: Complete!')
+        if self.upgrade_state.progress_id:
+            self.mgr.remote('progress', 'complete',
+                            self.upgrade_state.progress_id)
+        self.upgrade_state = None
+        self._save_upgrade_state()
+
     def _do_upgrade(self):
         # type: () -> None
         if not self.upgrade_state:
@@ -948,14 +1026,53 @@ class CephadmUpgrade:
             'who': 'mon',
         })
 
-        daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+        if self.upgrade_state.daemon_types is not None:
+            logger.debug(f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
+            daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in self.upgrade_state.daemon_types]
+        elif self.upgrade_state.services is not None:
+            logger.debug(f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
+            daemons = []
+            for service in self.upgrade_state.services:
+                daemons += self.mgr.cache.get_daemons_by_service(service)
+        else:
+            daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+        if self.upgrade_state.hosts is not None:
+            logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
+            daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
         upgraded_daemon_count: int = 0
         for daemon_type in CEPH_UPGRADE_ORDER:
+            if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0:
+                # we hit our limit and should end the upgrade
+                # except for cases where we only need to redeploy, but not actually upgrade
+                # the image (which we don't count towards our limit). This case only occurs with mgr
+                # and monitoring stack daemons. Additionally, this case is only valid if
+                # the active mgr is already upgraded.
+                if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
+                    if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr':
+                        continue
+                else:
+                    self._mark_upgrade_complete()
+                    return
             logger.debug('Upgrade: Checking %s daemons' % daemon_type)
+            daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
 
-            need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons, target_digests)
+            need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons_of_type, target_digests)
             upgraded_daemon_count += done
-            self._update_upgrade_progress(upgraded_daemon_count / len(self.mgr.cache.get_daemons()))
+            self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
+
+            # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
+            if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES:
+                if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
+                    need_upgrade_names = [d[0].name() for d in need_upgrade] + [d[0].name() for d in need_upgrade_deployer]
+                    dds = [d for d in self.mgr.cache.get_daemons_by_type(daemon_type) if d.name() not in need_upgrade_names]
+                    need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests)
+                    if not n1:
+                        if not need_upgrade_self and need_upgrade_active:
+                            need_upgrade_self = True
+                        need_upgrade_deployer += n2
+                else:
+                    # no point in trying to redeploy with new version if active mgr is not on the new version
+                    need_upgrade_deployer = []
 
             if not need_upgrade_self:
                 # only after the mgr itself is upgraded can we expect daemons to have
@@ -980,6 +1097,15 @@ class CephadmUpgrade:
             if to_upgrade:
                 return
 
+            self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
+
+            # following bits of _do_upgrade are for completing upgrade for given
+            # types. If we haven't actually finished upgrading all the daemons
+            # of this type, we should exit the loop here
+            _, n1, n2, _ = self._detect_need_upgrade(self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
+            if n1 or n2:
+                continue
+
             # complete mon upgrade?
             if daemon_type == 'mon':
                 if not self.mgr.get("have_local_config_map"):
@@ -1012,6 +1138,8 @@ class CephadmUpgrade:
             if daemon_type == 'mds':
                 self._complete_mds_upgrade()
 
+            logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type)
+
         # clean up
         logger.info('Upgrade: Finalizing container_image settings')
         self.mgr.set_container_image('global', target_image)
@@ -1029,10 +1157,5 @@ class CephadmUpgrade:
             'who': 'mon',
         })
 
-        logger.info('Upgrade: Complete!')
-        if self.upgrade_state.progress_id:
-            self.mgr.remote('progress', 'complete',
-                            self.upgrade_state.progress_id)
-        self.upgrade_state = None
-        self._save_upgrade_state()
+        self._mark_upgrade_complete()
         return
index 914a4bc52f69aa617f83730371e088bd39f97ecb..a41e7ad2d743942b10e220b95045b42be2b36a59 100644 (file)
@@ -770,6 +770,7 @@ class UpgradeStatusSpec(object):
         self.in_progress = False  # Is an upgrade underway?
         self.target_image: Optional[str] = None
         self.services_complete: List[str] = []  # Which daemon types are fully updated?
+        self.which: str = '<unknown>'  # for if user specified daemon types, services or hosts
         self.progress: Optional[str] = None  # How many of the daemons have we upgraded
         self.message = ""  # Freeform description
 
index 785daf4d8ce557d585a642adf2c92dd2e91dbf66..0235cbf67822429e46fbb3056130bd068ef3e4b4 100644 (file)
@@ -1386,6 +1386,7 @@ Usage:
         r = {
             'target_image': status.target_image,
             'in_progress': status.in_progress,
+            'which': status.which,
             'services_complete': status.services_complete,
             'progress': status.progress,
             'message': status.message,