From e6b0fe0e4859f83ca69d14d89f9e47f0ea74e770 Mon Sep 17 00:00:00 2001 From: Adam King Date: Wed, 30 Mar 2022 09:49:56 -0400 Subject: [PATCH] mgr/cephadm: add new args and validation for staggered upgrade Signed-off-by: Adam King --- src/pybind/mgr/cephadm/module.py | 32 +++++++- src/pybind/mgr/cephadm/upgrade.py | 95 ++++++++++++++++++++++- src/pybind/mgr/orchestrator/_interface.py | 3 +- src/pybind/mgr/orchestrator/module.py | 8 +- 4 files changed, 131 insertions(+), 7 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index fdb6ca616c882..53e08335c25da 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -60,7 +60,7 @@ from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, from .upgrade import CephadmUpgrade from .template import TemplateMgr from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \ - cephadmNoImage + cephadmNoImage, CEPH_UPGRADE_ORDER from .configchecks import CephadmConfigChecks from .offline_watcher import OfflineHostWatcher @@ -2692,10 +2692,36 @@ Then run the following: return self.upgrade.upgrade_ls(image, tags, show_all_versions) @handle_orch_error - def upgrade_start(self, image: str, version: str) -> str: + def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None, + services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state") - return self.upgrade.upgrade_start(image, version) + if daemon_types is not None and services is not None: + raise OrchestratorError('--daemon-types and --services are mutually exclusive') + if daemon_types is not None: + for dtype in daemon_types: + if dtype not in CEPH_UPGRADE_ORDER: + raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n' + f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}') + if services is not None: + for service in services: + if service not in self.spec_store: + raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n' + f'Known services are: {self.spec_store.all_specs.keys()}') + hosts: Optional[List[str]] = None + if host_placement is not None: + all_hosts = list(self.inventory.all_specs()) + placement = PlacementSpec.from_string(host_placement) + hosts = placement.filter_matching_hostspecs(all_hosts) + if not hosts: + raise OrchestratorError( + f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts') + + if limit is not None: + if limit < 1: + raise OrchestratorError(f'Upgrade aborted - --limit arg must be a positive integer, not {limit}') + + return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit) @handle_orch_error def upgrade_pause(self) -> str: diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index 1d959f3a6b006..c4e4f74b278d3 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -8,7 +8,8 @@ import orchestrator from cephadm.registry import Registry from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonDeploySpec -from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES +from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \ + MONITORING_STACK_TYPES, CEPH_TYPES, GATEWAY_TYPES from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service if TYPE_CHECKING: @@ -239,7 +240,8 @@ class CephadmUpgrade: r["tags"] = sorted(ls) return r - def upgrade_start(self, image: str, version: str) -> str: + def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, + hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: if self.mgr.mode != 'root': raise OrchestratorError('upgrade is not supported in %s mode' % ( self.mgr.mode)) @@ -252,6 +254,10 @@ class CephadmUpgrade: target_name = normalize_image_digest(image, self.mgr.default_registry) else: raise OrchestratorError('must specify either image or version') + + if daemon_types is not None or services is not None or hosts is not None: + self._validate_upgrade_filters(target_name, daemon_types, hosts, services) + if self.upgrade_state: if self.upgrade_state._target_name != target_name: raise OrchestratorError( @@ -280,6 +286,91 @@ class CephadmUpgrade: self.mgr.event.set() return 'Initiating upgrade to %s' % (target_name) + def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None: + def _latest_type(dtypes: List[str]) -> str: + # [::-1] gives the list in reverse + for daemon_type in CEPH_UPGRADE_ORDER[::-1]: + if daemon_type in dtypes: + return daemon_type + return '' + + def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]: + # this function takes a list of daemon types and first finds the daemon + # type from that list that is latest in our upgrade order. Then, from + # that latest type, it filters the list of candidate daemons received + # for daemons with types earlier in the upgrade order than the latest + # type found earlier. That filtered list of daemons is returned. The + # purpose of this function is to help in finding daemons that must have + # already been upgraded for the given filtering parameters (--daemon-types, + # --services, --hosts) to be valid. + latest = _latest_type(dtypes) + if not latest: + return [] + earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1] + earlier_types = [t for t in earlier_types if t not in dtypes] + return [d for d in candidates if d.daemon_type in earlier_types] + + if self.upgrade_state: + raise OrchestratorError('Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.') + try: + target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(target_name)) + except OrchestratorError as e: + raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}') + # what we need to do here is build a list of daemons that must already be upgraded + # in order for the user's selection of daemons to upgrade to be valid. for example, + # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block. + daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type not in MONITORING_STACK_TYPES] + err_msg_base = 'Cannot start upgrade. ' + # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters + dtypes = [] + if daemon_types is not None: + dtypes = daemon_types + if hosts is not None: + dtypes = [_latest_type(dtypes)] + other_host_daemons = [ + d for d in daemons if d.hostname is not None and d.hostname not in hosts] + daemons = _get_earlier_daemons(dtypes, other_host_daemons) + else: + daemons = _get_earlier_daemons(dtypes, daemons) + err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n' + elif services is not None: + # for our purposes here we can effectively convert our list of services into the + # set of daemon types the services contain. This works because we don't allow --services + # and --daemon-types at the same time and we only allow services of the same type + sspecs = [self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None] + stypes = list(set([s.service_type for s in sspecs])) + if len(stypes) != 1: + raise OrchestratorError('Doing upgrade by service only support services of one type at ' + f'a time. Found service types: {stypes}') + for stype in stypes: + dtypes += orchestrator.service_to_daemon_types(stype) + dtypes = list(set(dtypes)) + if hosts is not None: + other_host_daemons = [ + d for d in daemons if d.hostname is not None and d.hostname not in hosts] + daemons = _get_earlier_daemons(dtypes, other_host_daemons) + else: + daemons = _get_earlier_daemons(dtypes, daemons) + err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n' + elif hosts is not None: + # hosts must be handled a bit differently. For this, we really need to find all the daemon types + # that reside on hosts in the list of hosts we will upgrade. Then take the type from + # that list that is latest in the upgrade order and check if any daemons on hosts not in the + # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded. + dtypes = list(set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts])) + other_hosts_daemons = [d for d in daemons if d.hostname is not None and d.hostname not in hosts] + daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons) + err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n' + need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests) + if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)): + # also report active mgr as needing to be upgraded. It is not included in the resulting list + # by default as it is treated special and handled via the need_upgrade_self bool + n1.insert(0, (self.mgr.mgr_service.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr')), True)) + if n1 or n2: + raise OrchestratorError(f'{err_msg_base}Please first upgrade ' + f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n' + f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}') + def upgrade_pause(self) -> str: if not self.upgrade_state: raise OrchestratorError('No upgrade in progress') diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 214df05ee4bf6..6622372f87cb1 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -674,7 +674,8 @@ class Orchestrator(object): def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]: raise NotImplementedError() - def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]: + def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]], + hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]: raise NotImplementedError() def upgrade_pause(self) -> OrchResult[str]: diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 96222a64a4311..3ce7fb907fb35 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -1435,10 +1435,16 @@ Usage: def _upgrade_start(self, image: Optional[str] = None, _end_positional_: int = 0, + daemon_types: Optional[str] = None, + hosts: Optional[str] = None, + services: Optional[str] = None, + limit: Optional[int] = None, ceph_version: Optional[str] = None) -> HandleCommandResult: """Initiate upgrade""" self._upgrade_check_image_name(image, ceph_version) - completion = self.upgrade_start(image, ceph_version) + dtypes = daemon_types.split(',') if daemon_types is not None else None + service_names = services.split(',') if services is not None else None + completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit) raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) -- 2.39.5