From 7e3917e856e7cb1bc0fe27eb3b7418d56b200a8a Mon Sep 17 00:00:00 2001 From: Adam King Date: Mon, 28 Mar 2022 12:10:15 -0400 Subject: [PATCH] mgr/cephadm: split _do_upgrade into sub functions This function was around 500 lines and difficult to work with. Splitting it into sub functions should hopefully make it a bit easier to understand and make changes to. Signed-off-by: Adam King (cherry picked from commit 7b83c51fe63ae006b15dcf509c08a722f104788e) Conflicts: src/pybind/mgr/cephadm/upgrade.py --- src/pybind/mgr/cephadm/upgrade.py | 495 ++++++++++++++++-------------- 1 file changed, 266 insertions(+), 229 deletions(-) diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index e2bde0034a7f9..b3de00e16b293 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -520,6 +520,260 @@ class CephadmUpgrade: return True # if mds has no fs it should pass ok-to-stop + def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]: + # this function takes a list of daemons and container digests. The purpose + # is to go through each daemon and check if the current container digests + # for that daemon match the target digests. The purpose being that we determine + # if a daemon is upgraded to a certain container image or not based on what + # container digests it has. By checking the current digests against the + # targets we can determine which daemons still need to be upgraded + need_upgrade_self = False + need_upgrade: List[Tuple[DaemonDescription, bool]] = [] + need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = [] + done = 0 + if target_digests is None: + target_digests = [] + for d in daemons: + assert d.daemon_type is not None + assert d.daemon_id is not None + assert d.hostname is not None + correct_digest = False + if (any(d in target_digests for d in (d.container_image_digests or [])) + or d.daemon_type in MONITORING_STACK_TYPES): + logger.debug('daemon %s.%s container digest correct' % ( + d.daemon_type, d.daemon_id)) + correct_digest = True + if any(d in target_digests for d in (d.deployed_by or [])): + logger.debug('daemon %s.%s deployed by correct version' % ( + d.daemon_type, d.daemon_id)) + done += 1 + continue + + if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id): + logger.info('Upgrade: Need to upgrade myself (mgr.%s)' % + self.mgr.get_mgr_id()) + need_upgrade_self = True + continue + + if correct_digest: + logger.debug('daemon %s.%s not deployed by correct version' % ( + d.daemon_type, d.daemon_id)) + need_upgrade_deployer.append((d, True)) + else: + logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( + d.daemon_type, d.daemon_id, + d.container_image_name, d.container_image_digests, d.version)) + need_upgrade.append((d, False)) + + return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done) + + def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]: + to_upgrade: List[Tuple[DaemonDescription, bool]] = [] + known_ok_to_stop: List[str] = [] + for d_entry in need_upgrade: + d = d_entry[0] + assert d.daemon_type is not None + assert d.daemon_id is not None + assert d.hostname is not None + + if not d.container_image_id: + if d.container_image_name == target_image: + logger.debug( + 'daemon %s has unknown container_image_id but has correct image name' % (d.name())) + continue + + if known_ok_to_stop: + if d.name() in known_ok_to_stop: + logger.info(f'Upgrade: {d.name()} is also safe to restart') + to_upgrade.append(d_entry) + continue + + if d.daemon_type == 'osd': + # NOTE: known_ok_to_stop is an output argument for + # _wait_for_ok_to_stop + if not self._wait_for_ok_to_stop(d, known_ok_to_stop): + return False, to_upgrade + + if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop(): + if not self._wait_for_ok_to_stop(d, known_ok_to_stop): + return False, to_upgrade + + if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d): + if not self._wait_for_ok_to_stop(d, known_ok_to_stop): + return False, to_upgrade + + to_upgrade.append(d_entry) + + # if we don't have a list of others to consider, stop now + if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop: + break + return True, to_upgrade + + def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None: + assert self.upgrade_state is not None + num = 1 + if target_digests is None: + target_digests = [] + for d_entry in to_upgrade: + d = d_entry[0] + assert d.daemon_type is not None + assert d.daemon_id is not None + assert d.hostname is not None + + # make sure host has latest container image + out, errs, code = CephadmServe(self.mgr)._run_cephadm( + d.hostname, '', 'inspect-image', [], + image=target_image, no_fsid=True, error_ok=True) + if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])): + logger.info('Upgrade: Pulling %s on %s' % (target_image, + d.hostname)) + self.upgrade_info_str = 'Pulling %s image on host %s' % ( + target_image, d.hostname) + out, errs, code = CephadmServe(self.mgr)._run_cephadm( + d.hostname, '', 'pull', [], + image=target_image, no_fsid=True, error_ok=True) + if code: + self._fail_upgrade('UPGRADE_FAILED_PULL', { + 'severity': 'warning', + 'summary': 'Upgrade: failed to pull target image', + 'count': 1, + 'detail': [ + 'failed to pull %s on host %s' % (target_image, + d.hostname)], + }) + return + r = json.loads(''.join(out)) + if not any(d in target_digests for d in r.get('repo_digests', [])): + logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % ( + target_image, d.hostname, r['repo_digests'], target_digests)) + self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % ( + target_image, d.hostname, r['repo_digests'], target_digests) + self.upgrade_state.target_digests = r['repo_digests'] + self._save_upgrade_state() + return + + self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type) + + if len(to_upgrade) > 1: + logger.info('Upgrade: Updating %s.%s (%d/%d)' % + (d.daemon_type, d.daemon_id, num, len(to_upgrade))) + else: + logger.info('Upgrade: Updating %s.%s' % + (d.daemon_type, d.daemon_id)) + action = 'Upgrading' if not d_entry[1] else 'Redeploying' + try: + daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d) + self.mgr._daemon_action( + daemon_spec, + 'redeploy', + image=target_image if not d_entry[1] else None + ) + except Exception as e: + self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', { + 'severity': 'warning', + 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.', + 'count': 1, + 'detail': [ + f'Upgrade daemon: {d.name()}: {e}' + ], + }) + return + num += 1 + + def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None: + if need_upgrade_self: + try: + self.mgr.mgr_service.fail_over() + except OrchestratorError as e: + self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { + 'severity': 'warning', + 'summary': f'Upgrade: {e}', + 'count': 1, + 'detail': [ + 'The upgrade process needs to upgrade the mgr, ' + 'but it needs at least one standby to proceed.', + ], + }) + return + + return # unreachable code, as fail_over never returns + elif upgrading_mgrs: + if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks: + del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR'] + self.mgr.set_health_checks(self.mgr.health_checks) + + def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None: + # push down configs + daemon_type_section = name_to_config_section(daemon_type) + if image_settings.get(daemon_type_section) != target_image: + logger.info('Upgrade: Setting container_image for all %s' % + daemon_type) + self.mgr.set_container_image(daemon_type_section, target_image) + to_clean = [] + for section in image_settings.keys(): + if section.startswith(name_to_config_section(daemon_type) + '.'): + to_clean.append(section) + if to_clean: + logger.debug('Upgrade: Cleaning up container_image for %s' % + to_clean) + for section in to_clean: + ret, image, err = self.mgr.check_mon_command({ + 'prefix': 'config rm', + 'name': 'container_image', + 'who': section, + }) + + def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None: + osdmap = self.mgr.get("osd_map") + osd_min_name = osdmap.get("require_osd_release", "argonaut") + osd_min = ceph_release_to_major(osd_min_name) + if osd_min < int(target_major): + logger.info( + f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}') + ret, _, err = self.mgr.check_mon_command({ + 'prefix': 'osd require-osd-release', + 'release': target_major_name, + }) + + def _complete_mds_upgrade(self) -> None: + assert self.upgrade_state is not None + if self.upgrade_state.fs_original_max_mds: + for fs in self.mgr.get("fs_map")['filesystems']: + fscid = fs["id"] + fs_name = fs['mdsmap']['fs_name'] + new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1) + if new_max > 1: + self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % ( + fs_name, new_max + )) + ret, _, err = self.mgr.check_mon_command({ + 'prefix': 'fs set', + 'fs_name': fs_name, + 'var': 'max_mds', + 'val': str(new_max), + }) + + self.upgrade_state.fs_original_max_mds = {} + self._save_upgrade_state() + if self.upgrade_state.fs_original_allow_standby_replay: + for fs in self.mgr.get("fs_map")['filesystems']: + fscid = fs["id"] + fs_name = fs['mdsmap']['fs_name'] + asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False) + if asr: + self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % ( + fs_name + )) + ret, _, err = self.mgr.check_mon_command({ + 'prefix': 'fs set', + 'fs_name': fs_name, + 'var': 'allow_standby_replay', + 'val': '1' + }) + + self.upgrade_state.fs_original_allow_standby_replay = {} + self._save_upgrade_state() + def _do_upgrade(self): # type: () -> None if not self.upgrade_state: @@ -603,45 +857,13 @@ class CephadmUpgrade: }) daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER] - done = 0 + upgraded_daemon_count: int = 0 for daemon_type in CEPH_UPGRADE_ORDER: logger.debug('Upgrade: Checking %s daemons' % daemon_type) - need_upgrade_self = False - need_upgrade: List[Tuple[DaemonDescription, bool]] = [] - need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = [] - for d in daemons: - if d.daemon_type != daemon_type: - continue - assert d.daemon_type is not None - assert d.daemon_id is not None - correct_digest = False - if (any(d in target_digests for d in (d.container_image_digests or [])) - or d.daemon_type in MONITORING_STACK_TYPES): - logger.debug('daemon %s.%s container digest correct' % ( - daemon_type, d.daemon_id)) - correct_digest = True - if any(d in target_digests for d in (d.deployed_by or [])): - logger.debug('daemon %s.%s deployed by correct version' % ( - d.daemon_type, d.daemon_id)) - done += 1 - continue - - if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id): - logger.info('Upgrade: Need to upgrade myself (mgr.%s)' % - self.mgr.get_mgr_id()) - need_upgrade_self = True - continue - - if correct_digest: - logger.debug('daemon %s.%s not deployed by correct version' % ( - d.daemon_type, d.daemon_id)) - need_upgrade_deployer.append((d, True)) - else: - logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( - daemon_type, d.daemon_id, - d.container_image_name, d.container_image_digests, d.version)) - need_upgrade.append((d, False)) + need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons, target_digests) + upgraded_daemon_count += done + self._update_upgrade_progress(upgraded_daemon_count / len(self.mgr.cache.get_daemons())) if not need_upgrade_self: # only after the mgr itself is upgraded can we expect daemons to have @@ -659,114 +881,10 @@ class CephadmUpgrade: if need_upgrade: self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type) - to_upgrade: List[Tuple[DaemonDescription, bool]] = [] - known_ok_to_stop: List[str] = [] - for d_entry in need_upgrade: - d = d_entry[0] - assert d.daemon_type is not None - assert d.daemon_id is not None - assert d.hostname is not None - - if not d.container_image_id: - if d.container_image_name == target_image: - logger.debug( - 'daemon %s has unknown container_image_id but has correct image name' % (d.name())) - continue - - if known_ok_to_stop: - if d.name() in known_ok_to_stop: - logger.info(f'Upgrade: {d.name()} is also safe to restart') - to_upgrade.append(d_entry) - continue - - if d.daemon_type == 'osd': - # NOTE: known_ok_to_stop is an output argument for - # _wait_for_ok_to_stop - if not self._wait_for_ok_to_stop(d, known_ok_to_stop): - return - - if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop(): - if not self._wait_for_ok_to_stop(d, known_ok_to_stop): - return - - if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d): - if not self._wait_for_ok_to_stop(d, known_ok_to_stop): - return - - to_upgrade.append(d_entry) - - # if we don't have a list of others to consider, stop now - if not known_ok_to_stop: - break - - num = 1 - for d_entry in to_upgrade: - d = d_entry[0] - assert d.daemon_type is not None - assert d.daemon_id is not None - assert d.hostname is not None - - self._update_upgrade_progress(done / len(daemons)) - - # make sure host has latest container image - out, errs, code = CephadmServe(self.mgr)._run_cephadm( - d.hostname, '', 'inspect-image', [], - image=target_image, no_fsid=True, error_ok=True) - if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])): - logger.info('Upgrade: Pulling %s on %s' % (target_image, - d.hostname)) - self.upgrade_info_str = 'Pulling %s image on host %s' % ( - target_image, d.hostname) - out, errs, code = CephadmServe(self.mgr)._run_cephadm( - d.hostname, '', 'pull', [], - image=target_image, no_fsid=True, error_ok=True) - if code: - self._fail_upgrade('UPGRADE_FAILED_PULL', { - 'severity': 'warning', - 'summary': 'Upgrade: failed to pull target image', - 'count': 1, - 'detail': [ - 'failed to pull %s on host %s' % (target_image, - d.hostname)], - }) - return - r = json.loads(''.join(out)) - if not any(d in target_digests for d in r.get('repo_digests', [])): - logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % ( - target_image, d.hostname, r['repo_digests'], target_digests)) - self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % ( - target_image, d.hostname, r['repo_digests'], target_digests) - self.upgrade_state.target_digests = r['repo_digests'] - self._save_upgrade_state() - return - - self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type) - - if len(to_upgrade) > 1: - logger.info('Upgrade: Updating %s.%s (%d/%d)' % - (d.daemon_type, d.daemon_id, num, len(to_upgrade))) - else: - logger.info('Upgrade: Updating %s.%s' % - (d.daemon_type, d.daemon_id)) - action = 'Upgrading' if not d_entry[1] else 'Redeploying' - try: - daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d) - self.mgr._daemon_action( - daemon_spec, - 'redeploy', - image=target_image if not d_entry[1] else None - ) - except Exception as e: - self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', { - 'severity': 'warning', - 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.', - 'count': 1, - 'detail': [ - f'Upgrade daemon: {d.name()}: {e}' - ], - }) - return - num += 1 + _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image) + if not _continue: + return + self._upgrade_daemons(to_upgrade, target_image, target_digests) if to_upgrade: return @@ -776,26 +894,7 @@ class CephadmUpgrade: logger.info('Upgrade: Restarting mgr now that mons are running pacific') need_upgrade_self = True - if need_upgrade_self: - try: - self.mgr.mgr_service.fail_over() - except OrchestratorError as e: - self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { - 'severity': 'warning', - 'summary': f'Upgrade: {e}', - 'count': 1, - 'detail': [ - 'The upgrade process needs to upgrade the mgr, ' - 'but it needs at least one standby to proceed.', - ], - }) - return - - return # unreachable code, as fail_over never returns - elif daemon_type == 'mgr': - if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks: - del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR'] - self.mgr.set_health_checks(self.mgr.health_checks) + self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr') # make sure 'ceph versions' agrees ret, out_ver, err = self.mgr.check_mon_command({ @@ -809,79 +908,17 @@ class CephadmUpgrade: 'Upgrade: %d %s daemon(s) are %s != target %s' % (count, daemon_type, short_version, target_version)) - # push down configs - daemon_type_section = name_to_config_section(daemon_type) - if image_settings.get(daemon_type_section) != target_image: - logger.info('Upgrade: Setting container_image for all %s' % - daemon_type) - self.mgr.set_container_image(daemon_type_section, target_image) - to_clean = [] - for section in image_settings.keys(): - if section.startswith(name_to_config_section(daemon_type) + '.'): - to_clean.append(section) - if to_clean: - logger.debug('Upgrade: Cleaning up container_image for %s' % - to_clean) - for section in to_clean: - ret, image, err = self.mgr.check_mon_command({ - 'prefix': 'config rm', - 'name': 'container_image', - 'who': section, - }) + self._set_container_images(daemon_type, target_image, image_settings) logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type) # complete osd upgrade? if daemon_type == 'osd': - osdmap = self.mgr.get("osd_map") - osd_min_name = osdmap.get("require_osd_release", "argonaut") - osd_min = ceph_release_to_major(osd_min_name) - if osd_min < int(target_major): - logger.info( - f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}') - ret, _, err = self.mgr.check_mon_command({ - 'prefix': 'osd require-osd-release', - 'release': target_major_name, - }) + self._complete_osd_upgrade(target_major, target_major_name) # complete mds upgrade? if daemon_type == 'mds': - if self.upgrade_state.fs_original_max_mds: - for fs in self.mgr.get("fs_map")['filesystems']: - fscid = fs["id"] - fs_name = fs['mdsmap']['fs_name'] - new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1) - if new_max > 1: - self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % ( - fs_name, new_max - )) - ret, _, err = self.mgr.check_mon_command({ - 'prefix': 'fs set', - 'fs_name': fs_name, - 'var': 'max_mds', - 'val': str(new_max), - }) - - self.upgrade_state.fs_original_max_mds = {} - self._save_upgrade_state() - if self.upgrade_state.fs_original_allow_standby_replay: - for fs in self.mgr.get("fs_map")['filesystems']: - fscid = fs["id"] - fs_name = fs['mdsmap']['fs_name'] - asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False) - if asr: - self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % ( - fs_name - )) - ret, _, err = self.mgr.check_mon_command({ - 'prefix': 'fs set', - 'fs_name': fs_name, - 'var': 'allow_standby_replay', - 'val': '1' - }) - - self.upgrade_state.fs_original_allow_standby_replay = {} - self._save_upgrade_state() + self._complete_mds_upgrade() # clean up logger.info('Upgrade: Finalizing container_image settings') -- 2.39.5