from .upgrade import CephadmUpgrade
from .template import TemplateMgr
from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
- cephadmNoImage
+ cephadmNoImage, CEPH_UPGRADE_ORDER
from .configchecks import CephadmConfigChecks
from .offline_watcher import OfflineHostWatcher
return self.upgrade.upgrade_ls(image, tags, show_all_versions)
@handle_orch_error
- def upgrade_start(self, image: str, version: str) -> str:
+ def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
+ services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
if self.inventory.get_host_with_state("maintenance"):
raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
- return self.upgrade.upgrade_start(image, version)
+ if daemon_types is not None and services is not None:
+ raise OrchestratorError('--daemon-types and --services are mutually exclusive')
+ if daemon_types is not None:
+ for dtype in daemon_types:
+ if dtype not in CEPH_UPGRADE_ORDER:
+ raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
+ f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
+ if services is not None:
+ for service in services:
+ if service not in self.spec_store:
+ raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n'
+ f'Known services are: {self.spec_store.all_specs.keys()}')
+ hosts: Optional[List[str]] = None
+ if host_placement is not None:
+ all_hosts = list(self.inventory.all_specs())
+ placement = PlacementSpec.from_string(host_placement)
+ hosts = placement.filter_matching_hostspecs(all_hosts)
+ if not hosts:
+ raise OrchestratorError(
+ f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
+
+ if limit is not None:
+ if limit < 1:
+ raise OrchestratorError(f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
+
+ return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
@handle_orch_error
def upgrade_pause(self) -> str:
from cephadm.registry import Registry
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES
+from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \
+ MONITORING_STACK_TYPES, CEPH_TYPES, GATEWAY_TYPES
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
if TYPE_CHECKING:
r["tags"] = sorted(ls)
return r
- def upgrade_start(self, image: str, version: str) -> str:
+ def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
+ hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
if self.mgr.mode != 'root':
raise OrchestratorError('upgrade is not supported in %s mode' % (
self.mgr.mode))
target_name = normalize_image_digest(image, self.mgr.default_registry)
else:
raise OrchestratorError('must specify either image or version')
+
+ if daemon_types is not None or services is not None or hosts is not None:
+ self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
+
if self.upgrade_state:
if self.upgrade_state._target_name != target_name:
raise OrchestratorError(
self.mgr.event.set()
return 'Initiating upgrade to %s' % (target_name)
+ def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None:
+ def _latest_type(dtypes: List[str]) -> str:
+ # [::-1] gives the list in reverse
+ for daemon_type in CEPH_UPGRADE_ORDER[::-1]:
+ if daemon_type in dtypes:
+ return daemon_type
+ return ''
+
+ def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]:
+ # this function takes a list of daemon types and first finds the daemon
+ # type from that list that is latest in our upgrade order. Then, from
+ # that latest type, it filters the list of candidate daemons received
+ # for daemons with types earlier in the upgrade order than the latest
+ # type found earlier. That filtered list of daemons is returned. The
+ # purpose of this function is to help in finding daemons that must have
+ # already been upgraded for the given filtering parameters (--daemon-types,
+ # --services, --hosts) to be valid.
+ latest = _latest_type(dtypes)
+ if not latest:
+ return []
+ earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1]
+ earlier_types = [t for t in earlier_types if t not in dtypes]
+ return [d for d in candidates if d.daemon_type in earlier_types]
+
+ if self.upgrade_state:
+ raise OrchestratorError('Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
+ try:
+ target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(target_name))
+ except OrchestratorError as e:
+ raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
+ # what we need to do here is build a list of daemons that must already be upgraded
+ # in order for the user's selection of daemons to upgrade to be valid. for example,
+ # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
+ daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type not in MONITORING_STACK_TYPES]
+ err_msg_base = 'Cannot start upgrade. '
+ # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
+ dtypes = []
+ if daemon_types is not None:
+ dtypes = daemon_types
+ if hosts is not None:
+ dtypes = [_latest_type(dtypes)]
+ other_host_daemons = [
+ d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+ daemons = _get_earlier_daemons(dtypes, other_host_daemons)
+ else:
+ daemons = _get_earlier_daemons(dtypes, daemons)
+ err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
+ elif services is not None:
+ # for our purposes here we can effectively convert our list of services into the
+ # set of daemon types the services contain. This works because we don't allow --services
+ # and --daemon-types at the same time and we only allow services of the same type
+ sspecs = [self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
+ stypes = list(set([s.service_type for s in sspecs]))
+ if len(stypes) != 1:
+ raise OrchestratorError('Doing upgrade by service only support services of one type at '
+ f'a time. Found service types: {stypes}')
+ for stype in stypes:
+ dtypes += orchestrator.service_to_daemon_types(stype)
+ dtypes = list(set(dtypes))
+ if hosts is not None:
+ other_host_daemons = [
+ d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+ daemons = _get_earlier_daemons(dtypes, other_host_daemons)
+ else:
+ daemons = _get_earlier_daemons(dtypes, daemons)
+ err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
+ elif hosts is not None:
+ # hosts must be handled a bit differently. For this, we really need to find all the daemon types
+ # that reside on hosts in the list of hosts we will upgrade. Then take the type from
+ # that list that is latest in the upgrade order and check if any daemons on hosts not in the
+ # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
+ dtypes = list(set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
+ other_hosts_daemons = [d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+ daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons)
+ err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
+ need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests)
+ if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)):
+ # also report active mgr as needing to be upgraded. It is not included in the resulting list
+ # by default as it is treated special and handled via the need_upgrade_self bool
+ n1.insert(0, (self.mgr.mgr_service.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr')), True))
+ if n1 or n2:
+ raise OrchestratorError(f'{err_msg_base}Please first upgrade '
+ f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
+ f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
+
def upgrade_pause(self) -> str:
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')