import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional, Dict, List
+from typing import TYPE_CHECKING, Optional, Dict, List, Tuple
import orchestrator
from cephadm.serve import CephadmServe
if self.upgrade_state:
r.target_image = self.target_image
r.in_progress = True
+ r.progress, r.services_complete = self._get_upgrade_info()
+ # accessing self.upgrade_info_str will throw an exception if it
+ # has not been set in _do_upgrade yet
+ try:
+ r.message = self.upgrade_info_str
+ except AttributeError:
+ pass
if self.upgrade_state.error:
r.message = 'Error: ' + self.upgrade_state.error
elif self.upgrade_state.paused:
r.message = 'Upgrade paused'
return r
+ def _get_upgrade_info(self) -> Tuple[str, List[str]]:
+ if not self.upgrade_state or not self.upgrade_state.target_digests:
+ return '', []
+
+ daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+
+ if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
+ return '', []
+
+ completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in (
+ d.container_image_digests or []))) for d in daemons if d.daemon_type]
+
+ done = len([True for completion in completed_daemons if completion[1]])
+
+ completed_types = list(set([completion[0] for completion in completed_daemons if all(
+ c[1] for c in completed_daemons if c[0] == completion[0])]))
+
+ return '%s/%s ceph daemons upgraded' % (done, len(daemons)), completed_types
+
def _check_target_version(self, version: str) -> Optional[str]:
try:
(major, minor, patch) = version.split('.')
if not target_id or not target_version or not target_digests:
# need to learn the container hash
logger.info('Upgrade: First pull of %s' % target_image)
+ self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
try:
target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
target_image)
image_settings = self.get_distinct_container_image_settings()
- daemons = self.mgr.cache.get_daemons()
+ daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
done = 0
for daemon_type in CEPH_UPGRADE_ORDER:
logger.info('Upgrade: Checking %s daemons' % daemon_type)
):
return
+ if need_upgrade:
+ self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
+
to_upgrade = []
known_ok_to_stop: List[str] = []
for d in need_upgrade:
if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
logger.info('Upgrade: Pulling %s on %s' % (target_image,
d.hostname))
+ self.upgrade_info_str = 'Pulling %s image on host %s' % (
+ target_image, d.hostname)
out, errs, code = CephadmServe(self.mgr)._run_cephadm(
d.hostname, '', 'pull', [],
image=target_image, no_fsid=True, error_ok=True)
if not any(d in target_digests for d in r.get('repo_digests', [])):
logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
target_image, d.hostname, r['repo_digests'], target_digests))
+ self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
+ target_image, d.hostname, r['repo_digests'], target_digests)
self.upgrade_state.target_digests = r['repo_digests']
self._save_upgrade_state()
return
+ self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
+
if len(to_upgrade) > 1:
logger.info('Upgrade: Updating %s.%s (%d/%d)' %
(d.daemon_type, d.daemon_id, num, len(to_upgrade)))