]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: split _do_upgrade into sub functions
authorAdam King <adking@redhat.com>
Mon, 28 Mar 2022 16:10:15 +0000 (12:10 -0400)
committerAdam King <adking@redhat.com>
Thu, 19 May 2022 21:24:56 +0000 (17:24 -0400)
This function was around 500 lines and difficult to work
with. Splitting it into sub functions should hopefully make
it a bit easier to understand and make changes to.

Signed-off-by: Adam King <adking@redhat.com>
src/pybind/mgr/cephadm/upgrade.py

index bb157e7f4d760f643fa0956263f42380806663e0..1d959f3a6b006eb7a96c5b7c555111e53c12b4a8 100644 (file)
@@ -534,6 +534,263 @@ class CephadmUpgrade:
 
         return True  # if mds has no fs it should pass ok-to-stop
 
+    def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]:
+        # this function takes a list of daemons and container digests. The purpose
+        # is to go through each daemon and check if the current container digests
+        # for that daemon match the target digests. The purpose being that we determine
+        # if a daemon is upgraded to a certain container image or not based on what
+        # container digests it has. By checking the current digests against the
+        # targets we can determine which daemons still need to be upgraded
+        need_upgrade_self = False
+        need_upgrade: List[Tuple[DaemonDescription, bool]] = []
+        need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
+        done = 0
+        if target_digests is None:
+            target_digests = []
+        for d in daemons:
+            assert d.daemon_type is not None
+            assert d.daemon_id is not None
+            assert d.hostname is not None
+            if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
+                continue
+            correct_digest = False
+            if (any(d in target_digests for d in (d.container_image_digests or []))
+                    or d.daemon_type in MONITORING_STACK_TYPES):
+                logger.debug('daemon %s.%s container digest correct' % (
+                    d.daemon_type, d.daemon_id))
+                correct_digest = True
+                if any(d in target_digests for d in (d.deployed_by or [])):
+                    logger.debug('daemon %s.%s deployed by correct version' % (
+                        d.daemon_type, d.daemon_id))
+                    done += 1
+                    continue
+
+            if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
+                logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
+                            self.mgr.get_mgr_id())
+                need_upgrade_self = True
+                continue
+
+            if correct_digest:
+                logger.debug('daemon %s.%s not deployed by correct version' % (
+                    d.daemon_type, d.daemon_id))
+                need_upgrade_deployer.append((d, True))
+            else:
+                logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
+                    d.daemon_type, d.daemon_id,
+                    d.container_image_name, d.container_image_digests, d.version))
+                need_upgrade.append((d, False))
+
+        return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
+
+    def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
+        to_upgrade: List[Tuple[DaemonDescription, bool]] = []
+        known_ok_to_stop: List[str] = []
+        for d_entry in need_upgrade:
+            d = d_entry[0]
+            assert d.daemon_type is not None
+            assert d.daemon_id is not None
+            assert d.hostname is not None
+
+            if not d.container_image_id:
+                if d.container_image_name == target_image:
+                    logger.debug(
+                        'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
+                    continue
+
+            if known_ok_to_stop:
+                if d.name() in known_ok_to_stop:
+                    logger.info(f'Upgrade: {d.name()} is also safe to restart')
+                    to_upgrade.append(d_entry)
+                continue
+
+            if d.daemon_type == 'osd':
+                # NOTE: known_ok_to_stop is an output argument for
+                # _wait_for_ok_to_stop
+                if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+                    return False, to_upgrade
+
+            if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
+                if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+                    return False, to_upgrade
+
+            if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
+                if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+                    return False, to_upgrade
+
+            to_upgrade.append(d_entry)
+
+            # if we don't have a list of others to consider, stop now
+            if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
+                break
+        return True, to_upgrade
+
+    def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
+        assert self.upgrade_state is not None
+        num = 1
+        if target_digests is None:
+            target_digests = []
+        for d_entry in to_upgrade:
+            d = d_entry[0]
+            assert d.daemon_type is not None
+            assert d.daemon_id is not None
+            assert d.hostname is not None
+
+            # make sure host has latest container image
+            out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+                d.hostname, '', 'inspect-image', [],
+                image=target_image, no_fsid=True, error_ok=True))
+            if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
+                logger.info('Upgrade: Pulling %s on %s' % (target_image,
+                                                           d.hostname))
+                self.upgrade_info_str = 'Pulling %s image on host %s' % (
+                    target_image, d.hostname)
+                out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+                    d.hostname, '', 'pull', [],
+                    image=target_image, no_fsid=True, error_ok=True))
+                if code:
+                    self._fail_upgrade('UPGRADE_FAILED_PULL', {
+                        'severity': 'warning',
+                        'summary': 'Upgrade: failed to pull target image',
+                        'count': 1,
+                        'detail': [
+                            'failed to pull %s on host %s' % (target_image,
+                                                              d.hostname)],
+                    })
+                    return
+                r = json.loads(''.join(out))
+                if not any(d in target_digests for d in r.get('repo_digests', [])):
+                    logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
+                        target_image, d.hostname, r['repo_digests'], target_digests))
+                    self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
+                        target_image, d.hostname, r['repo_digests'], target_digests)
+                    self.upgrade_state.target_digests = r['repo_digests']
+                    self._save_upgrade_state()
+                    return
+
+                self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
+
+            if len(to_upgrade) > 1:
+                logger.info('Upgrade: Updating %s.%s (%d/%d)' %
+                            (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
+            else:
+                logger.info('Upgrade: Updating %s.%s' %
+                            (d.daemon_type, d.daemon_id))
+            action = 'Upgrading' if not d_entry[1] else 'Redeploying'
+            try:
+                daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
+                self.mgr._daemon_action(
+                    daemon_spec,
+                    'redeploy',
+                    image=target_image if not d_entry[1] else None
+                )
+                self.mgr.cache.metadata_up_to_date[d.hostname] = False
+            except Exception as e:
+                self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
+                    'severity': 'warning',
+                    'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
+                    'count': 1,
+                    'detail': [
+                        f'Upgrade daemon: {d.name()}: {e}'
+                    ],
+                })
+                return
+            num += 1
+
+    def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
+        if need_upgrade_self:
+            try:
+                self.mgr.mgr_service.fail_over()
+            except OrchestratorError as e:
+                self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
+                    'severity': 'warning',
+                    'summary': f'Upgrade: {e}',
+                    'count': 1,
+                    'detail': [
+                        'The upgrade process needs to upgrade the mgr, '
+                        'but it needs at least one standby to proceed.',
+                    ],
+                })
+                return
+
+            return  # unreachable code, as fail_over never returns
+        elif upgrading_mgrs:
+            if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
+                del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
+                self.mgr.set_health_checks(self.mgr.health_checks)
+
+    def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None:
+        # push down configs
+        daemon_type_section = name_to_config_section(daemon_type)
+        if image_settings.get(daemon_type_section) != target_image:
+            logger.info('Upgrade: Setting container_image for all %s' %
+                        daemon_type)
+            self.mgr.set_container_image(daemon_type_section, target_image)
+        to_clean = []
+        for section in image_settings.keys():
+            if section.startswith(name_to_config_section(daemon_type) + '.'):
+                to_clean.append(section)
+        if to_clean:
+            logger.debug('Upgrade: Cleaning up container_image for %s' %
+                         to_clean)
+            for section in to_clean:
+                ret, image, err = self.mgr.check_mon_command({
+                    'prefix': 'config rm',
+                    'name': 'container_image',
+                    'who': section,
+                })
+
+    def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None:
+        osdmap = self.mgr.get("osd_map")
+        osd_min_name = osdmap.get("require_osd_release", "argonaut")
+        osd_min = ceph_release_to_major(osd_min_name)
+        if osd_min < int(target_major):
+            logger.info(
+                f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
+            ret, _, err = self.mgr.check_mon_command({
+                'prefix': 'osd require-osd-release',
+                'release': target_major_name,
+            })
+
+    def _complete_mds_upgrade(self) -> None:
+        assert self.upgrade_state is not None
+        if self.upgrade_state.fs_original_max_mds:
+            for fs in self.mgr.get("fs_map")['filesystems']:
+                fscid = fs["id"]
+                fs_name = fs['mdsmap']['fs_name']
+                new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
+                if new_max > 1:
+                    self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
+                        fs_name, new_max
+                    ))
+                    ret, _, err = self.mgr.check_mon_command({
+                        'prefix': 'fs set',
+                        'fs_name': fs_name,
+                        'var': 'max_mds',
+                        'val': str(new_max),
+                    })
+
+            self.upgrade_state.fs_original_max_mds = {}
+            self._save_upgrade_state()
+        if self.upgrade_state.fs_original_allow_standby_replay:
+            for fs in self.mgr.get("fs_map")['filesystems']:
+                fscid = fs["id"]
+                fs_name = fs['mdsmap']['fs_name']
+                asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
+                if asr:
+                    self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
+                        fs_name
+                    ))
+                    ret, _, err = self.mgr.check_mon_command({
+                        'prefix': 'fs set',
+                        'fs_name': fs_name,
+                        'var': 'allow_standby_replay',
+                        'val': '1'
+                    })
+
+            self.upgrade_state.fs_original_allow_standby_replay = {}
+            self._save_upgrade_state()
+
     def _do_upgrade(self):
         # type: () -> None
         if not self.upgrade_state:
@@ -549,7 +806,7 @@ class CephadmUpgrade:
         if not target_id or not target_version or not target_digests:
             # need to learn the container hash
             logger.info('Upgrade: First pull of %s' % target_image)
-            self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image)
+            self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
             try:
                 target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
                     target_image))
@@ -617,48 +874,13 @@ class CephadmUpgrade:
         })
 
         daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
-        done = 0
+        upgraded_daemon_count: int = 0
         for daemon_type in CEPH_UPGRADE_ORDER:
             logger.debug('Upgrade: Checking %s daemons' % daemon_type)
 
-            need_upgrade_self = False
-            need_upgrade: List[Tuple[DaemonDescription, bool]] = []
-            need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
-            for d in daemons:
-                if d.daemon_type != daemon_type:
-                    continue
-                assert d.daemon_type is not None
-                assert d.daemon_id is not None
-                assert d.hostname is not None
-                if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
-                    continue
-                correct_digest = False
-                if (any(d in target_digests for d in (d.container_image_digests or []))
-                        or d.daemon_type in MONITORING_STACK_TYPES):
-                    logger.debug('daemon %s.%s container digest correct' % (
-                        daemon_type, d.daemon_id))
-                    correct_digest = True
-                    if any(d in target_digests for d in (d.deployed_by or [])):
-                        logger.debug('daemon %s.%s deployed by correct version' % (
-                            d.daemon_type, d.daemon_id))
-                        done += 1
-                        continue
-
-                if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
-                    logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
-                                self.mgr.get_mgr_id())
-                    need_upgrade_self = True
-                    continue
-
-                if correct_digest:
-                    logger.debug('daemon %s.%s not deployed by correct version' % (
-                        d.daemon_type, d.daemon_id))
-                    need_upgrade_deployer.append((d, True))
-                else:
-                    logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
-                        daemon_type, d.daemon_id,
-                        d.container_image_name, d.container_image_digests, d.version))
-                    need_upgrade.append((d, False))
+            need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons, target_digests)
+            upgraded_daemon_count += done
+            self._update_upgrade_progress(upgraded_daemon_count / len(self.mgr.cache.get_daemons()))
 
             if not need_upgrade_self:
                 # only after the mgr itself is upgraded can we expect daemons to have
@@ -676,115 +898,10 @@ class CephadmUpgrade:
             if need_upgrade:
                 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
 
-            to_upgrade: List[Tuple[DaemonDescription, bool]] = []
-            known_ok_to_stop: List[str] = []
-            for d_entry in need_upgrade:
-                d = d_entry[0]
-                assert d.daemon_type is not None
-                assert d.daemon_id is not None
-                assert d.hostname is not None
-
-                if not d.container_image_id:
-                    if d.container_image_name == target_image:
-                        logger.debug(
-                            'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
-                        continue
-
-                if known_ok_to_stop:
-                    if d.name() in known_ok_to_stop:
-                        logger.info(f'Upgrade: {d.name()} is also safe to restart')
-                        to_upgrade.append(d_entry)
-                    continue
-
-                if d.daemon_type == 'osd':
-                    # NOTE: known_ok_to_stop is an output argument for
-                    # _wait_for_ok_to_stop
-                    if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
-                        return
-
-                if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
-                    if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
-                        return
-
-                if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
-                    if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
-                        return
-
-                to_upgrade.append(d_entry)
-
-                # if we don't have a list of others to consider, stop now
-                if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
-                    break
-
-            num = 1
-            for d_entry in to_upgrade:
-                d = d_entry[0]
-                assert d.daemon_type is not None
-                assert d.daemon_id is not None
-                assert d.hostname is not None
-
-                self._update_upgrade_progress(done / len(daemons))
-
-                # make sure host has latest container image
-                out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
-                    d.hostname, '', 'inspect-image', [],
-                    image=target_image, no_fsid=True, error_ok=True))
-                if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
-                    logger.info('Upgrade: Pulling %s on %s' % (target_image,
-                                                               d.hostname))
-                    self.upgrade_info_str = 'Pulling %s image on host %s' % (
-                        target_image, d.hostname)
-                    out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
-                        d.hostname, '', 'pull', [],
-                        image=target_image, no_fsid=True, error_ok=True))
-                    if code:
-                        self._fail_upgrade('UPGRADE_FAILED_PULL', {
-                            'severity': 'warning',
-                            'summary': 'Upgrade: failed to pull target image',
-                            'count': 1,
-                            'detail': [
-                                'failed to pull %s on host %s' % (target_image,
-                                                                  d.hostname)],
-                        })
-                        return
-                    r = json.loads(''.join(out))
-                    if not any(d in target_digests for d in r.get('repo_digests', [])):
-                        logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
-                            target_image, d.hostname, r['repo_digests'], target_digests))
-                        self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
-                            target_image, d.hostname, r['repo_digests'], target_digests)
-                        self.upgrade_state.target_digests = r['repo_digests']
-                        self._save_upgrade_state()
-                        return
-
-                    self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
-
-                if len(to_upgrade) > 1:
-                    logger.info('Upgrade: Updating %s.%s (%d/%d)' %
-                                (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
-                else:
-                    logger.info('Upgrade: Updating %s.%s' %
-                                (d.daemon_type, d.daemon_id))
-                action = 'Upgrading' if not d_entry[1] else 'Redeploying'
-                try:
-                    daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
-                    self.mgr._daemon_action(
-                        daemon_spec,
-                        'redeploy',
-                        image=target_image if not d_entry[1] else None
-                    )
-                    self.mgr.cache.metadata_up_to_date[d.hostname] = False
-                except Exception as e:
-                    self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
-                        'severity': 'warning',
-                        'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
-                        'count': 1,
-                        'detail': [
-                            f'Upgrade daemon: {d.name()}: {e}'
-                        ],
-                    })
-                    return
-                num += 1
+            _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image)
+            if not _continue:
+                return
+            self._upgrade_daemons(to_upgrade, target_image, target_digests)
             if to_upgrade:
                 return
 
@@ -794,26 +911,7 @@ class CephadmUpgrade:
                     logger.info('Upgrade: Restarting mgr now that mons are running pacific')
                     need_upgrade_self = True
 
-            if need_upgrade_self:
-                try:
-                    self.mgr.mgr_service.fail_over()
-                except OrchestratorError as e:
-                    self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
-                        'severity': 'warning',
-                        'summary': f'Upgrade: {e}',
-                        'count': 1,
-                        'detail': [
-                            'The upgrade process needs to upgrade the mgr, '
-                            'but it needs at least one standby to proceed.',
-                        ],
-                    })
-                    return
-
-                return  # unreachable code, as fail_over never returns
-            elif daemon_type == 'mgr':
-                if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
-                    del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
-                    self.mgr.set_health_checks(self.mgr.health_checks)
+            self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
 
             # make sure 'ceph versions' agrees
             ret, out_ver, err = self.mgr.check_mon_command({
@@ -827,77 +925,15 @@ class CephadmUpgrade:
                         'Upgrade: %d %s daemon(s) are %s != target %s' %
                         (count, daemon_type, short_version, target_version))
 
-            # push down configs
-            daemon_type_section = name_to_config_section(daemon_type)
-            if image_settings.get(daemon_type_section) != target_image:
-                logger.info('Upgrade: Setting container_image for all %s' %
-                            daemon_type)
-                self.mgr.set_container_image(daemon_type_section, target_image)
-            to_clean = []
-            for section in image_settings.keys():
-                if section.startswith(name_to_config_section(daemon_type) + '.'):
-                    to_clean.append(section)
-            if to_clean:
-                logger.debug('Upgrade: Cleaning up container_image for %s' %
-                             to_clean)
-                for section in to_clean:
-                    ret, image, err = self.mgr.check_mon_command({
-                        'prefix': 'config rm',
-                        'name': 'container_image',
-                        'who': section,
-                    })
+            self._set_container_images(daemon_type, target_image, image_settings)
 
             # complete osd upgrade?
             if daemon_type == 'osd':
-                osdmap = self.mgr.get("osd_map")
-                osd_min_name = osdmap.get("require_osd_release", "argonaut")
-                osd_min = ceph_release_to_major(osd_min_name)
-                if osd_min < int(target_major):
-                    logger.info(
-                        f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
-                    ret, _, err = self.mgr.check_mon_command({
-                        'prefix': 'osd require-osd-release',
-                        'release': target_major_name,
-                    })
+                self._complete_osd_upgrade(target_major, target_major_name)
 
             # complete mds upgrade?
             if daemon_type == 'mds':
-                if self.upgrade_state.fs_original_max_mds:
-                    for fs in self.mgr.get("fs_map")['filesystems']:
-                        fscid = fs["id"]
-                        fs_name = fs['mdsmap']['fs_name']
-                        new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
-                        if new_max > 1:
-                            self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
-                                fs_name, new_max
-                            ))
-                            ret, _, err = self.mgr.check_mon_command({
-                                'prefix': 'fs set',
-                                'fs_name': fs_name,
-                                'var': 'max_mds',
-                                'val': str(new_max),
-                            })
-
-                    self.upgrade_state.fs_original_max_mds = {}
-                    self._save_upgrade_state()
-                if self.upgrade_state.fs_original_allow_standby_replay:
-                    for fs in self.mgr.get("fs_map")['filesystems']:
-                        fscid = fs["id"]
-                        fs_name = fs['mdsmap']['fs_name']
-                        asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
-                        if asr:
-                            self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
-                                fs_name
-                            ))
-                            ret, _, err = self.mgr.check_mon_command({
-                                'prefix': 'fs set',
-                                'fs_name': fs_name,
-                                'var': 'allow_standby_replay',
-                                'val': '1'
-                            })
-
-                    self.upgrade_state.fs_original_allow_standby_replay = {}
-                    self._save_upgrade_state()
+                self._complete_mds_upgrade()
 
             # Make sure all metadata is up to date before saying we are done upgrading this daemon type
             if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():