error: Optional[str] = None,
paused: Optional[bool] = None,
fs_original_max_mds: Optional[Dict[str, int]] = None,
- fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None
+ fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
+ daemon_types: Optional[List[str]] = None,
+ hosts: Optional[List[str]] = None,
+ services: Optional[List[str]] = None,
+ total_count: Optional[int] = None,
+ remaining_count: Optional[int] = None,
):
self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
self.progress_id: str = progress_id
self.error: Optional[str] = error
self.paused: bool = paused or False
self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
- self.fs_original_allow_standby_replay: Optional[Dict[str, bool]] = fs_original_allow_standby_replay
+ self.fs_original_allow_standby_replay: Optional[Dict[str,
+ bool]] = fs_original_allow_standby_replay
+ self.daemon_types = daemon_types
+ self.hosts = hosts
+ self.services = services
+ self.total_count = total_count
+ self.remaining_count = remaining_count
def to_json(self) -> dict:
return {
'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
'error': self.error,
'paused': self.paused,
+ 'daemon_types': self.daemon_types,
+ 'hosts': self.hosts,
+ 'services': self.services,
+ 'total_count': self.total_count,
+ 'remaining_count': self.remaining_count,
}
@classmethod
r.target_image = self.target_image
r.in_progress = True
r.progress, r.services_complete = self._get_upgrade_info()
+
+ if self.upgrade_state.daemon_types is not None:
+ which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
+ if self.upgrade_state.hosts is not None:
+ which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
+ elif self.upgrade_state.services is not None:
+ which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
+ if self.upgrade_state.hosts is not None:
+ which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
+ elif self.upgrade_state.hosts is not None:
+ which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
+ else:
+ which_str = 'Upgrading all daemon types on all hosts'
+ if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
+ which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
+ r.which = which_str
+
# accessing self.upgrade_info_str will throw an exception if it
# has not been set in _do_upgrade yet
try:
if not self.upgrade_state or not self.upgrade_state.target_digests:
return '', []
- daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+ daemons = self._get_filtered_daemons()
if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
return '', []
return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
+ def _get_filtered_daemons(self) -> List[DaemonDescription]:
+ # Return the set of daemons set to be upgraded with out current
+ # filtering parameters (or all daemons in upgrade order if no filtering
+ # parameter are set).
+ assert self.upgrade_state is not None
+ if self.upgrade_state.daemon_types is not None:
+ daemons = [d for d in self.mgr.cache.get_daemons(
+ ) if d.daemon_type in self.upgrade_state.daemon_types]
+ elif self.upgrade_state.services is not None:
+ daemons = []
+ for service in self.upgrade_state.services:
+ daemons += self.mgr.cache.get_daemons_by_service(service)
+ else:
+ daemons = [d for d in self.mgr.cache.get_daemons(
+ ) if d.daemon_type in CEPH_UPGRADE_ORDER]
+ if self.upgrade_state.hosts is not None:
+ daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
+ return daemons
+
+ def _get_current_version(self) -> Tuple[int, int, str]:
+ current_version = self.mgr.version.split('ceph version ')[1]
+ (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
+ return (int(current_major), int(current_minor), current_version)
+
def _check_target_version(self, version: str) -> Optional[str]:
try:
(major, minor, _) = version.split('.', 2)
self.mgr.log.info('Upgrade: Started with target %s' % target_name)
self.upgrade_state = UpgradeState(
target_name=target_name,
- progress_id=str(uuid.uuid4())
+ progress_id=str(uuid.uuid4()),
+ daemon_types=daemon_types,
+ hosts=hosts,
+ services=services,
+ total_count=limit,
+ remaining_count=limit,
)
self._update_upgrade_progress(0.0)
self._save_upgrade_state()
if target_digests is None:
target_digests = []
for d_entry in to_upgrade:
+ if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
+ self.mgr.log.info(f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
+ return
d = d_entry[0]
assert d.daemon_type is not None
assert d.daemon_id is not None
self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
if len(to_upgrade) > 1:
- logger.info('Upgrade: Updating %s.%s (%d/%d)' %
- (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
+ logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade), self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
else:
logger.info('Upgrade: Updating %s.%s' %
(d.daemon_type, d.daemon_id))
})
return
num += 1
+ if self.upgrade_state.remaining_count is not None and not d_entry[1]:
+ self.upgrade_state.remaining_count -= 1
+ self._save_upgrade_state()
def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
if need_upgrade_self:
self.upgrade_state.fs_original_allow_standby_replay = {}
self._save_upgrade_state()
+ def _mark_upgrade_complete(self) -> None:
+ if not self.upgrade_state:
+ logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
+ return
+ logger.info('Upgrade: Complete!')
+ if self.upgrade_state.progress_id:
+ self.mgr.remote('progress', 'complete',
+ self.upgrade_state.progress_id)
+ self.upgrade_state = None
+ self._save_upgrade_state()
+
def _do_upgrade(self):
# type: () -> None
if not self.upgrade_state:
'who': 'mon',
})
- daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+ if self.upgrade_state.daemon_types is not None:
+ logger.debug(f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
+ daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in self.upgrade_state.daemon_types]
+ elif self.upgrade_state.services is not None:
+ logger.debug(f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
+ daemons = []
+ for service in self.upgrade_state.services:
+ daemons += self.mgr.cache.get_daemons_by_service(service)
+ else:
+ daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+ if self.upgrade_state.hosts is not None:
+ logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
+ daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
upgraded_daemon_count: int = 0
for daemon_type in CEPH_UPGRADE_ORDER:
+ if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0:
+ # we hit our limit and should end the upgrade
+ # except for cases where we only need to redeploy, but not actually upgrade
+ # the image (which we don't count towards our limit). This case only occurs with mgr
+ # and monitoring stack daemons. Additionally, this case is only valid if
+ # the active mgr is already upgraded.
+ if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
+ if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr':
+ continue
+ else:
+ self._mark_upgrade_complete()
+ return
logger.debug('Upgrade: Checking %s daemons' % daemon_type)
+ daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
- need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons, target_digests)
+ need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(daemons_of_type, target_digests)
upgraded_daemon_count += done
- self._update_upgrade_progress(upgraded_daemon_count / len(self.mgr.cache.get_daemons()))
+ self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
+
+ # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
+ if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES:
+ if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
+ need_upgrade_names = [d[0].name() for d in need_upgrade] + [d[0].name() for d in need_upgrade_deployer]
+ dds = [d for d in self.mgr.cache.get_daemons_by_type(daemon_type) if d.name() not in need_upgrade_names]
+ need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests)
+ if not n1:
+ if not need_upgrade_self and need_upgrade_active:
+ need_upgrade_self = True
+ need_upgrade_deployer += n2
+ else:
+ # no point in trying to redeploy with new version if active mgr is not on the new version
+ need_upgrade_deployer = []
if not need_upgrade_self:
# only after the mgr itself is upgraded can we expect daemons to have
if to_upgrade:
return
+ self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
+
+ # following bits of _do_upgrade are for completing upgrade for given
+ # types. If we haven't actually finished upgrading all the daemons
+ # of this type, we should exit the loop here
+ _, n1, n2, _ = self._detect_need_upgrade(self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
+ if n1 or n2:
+ continue
+
# complete mon upgrade?
if daemon_type == 'mon':
if not self.mgr.get("have_local_config_map"):
if daemon_type == 'mds':
self._complete_mds_upgrade()
+ logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type)
+
# clean up
logger.info('Upgrade: Finalizing container_image settings')
self.mgr.set_container_image('global', target_image)
'who': 'mon',
})
- logger.info('Upgrade: Complete!')
- if self.upgrade_state.progress_id:
- self.mgr.remote('progress', 'complete',
- self.upgrade_state.progress_id)
- self.upgrade_state = None
- self._save_upgrade_state()
+ self._mark_upgrade_complete()
return