From 3072b1d8cbc8b5ac83697007767e12ff805a693d Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 3 Aug 2020 15:51:53 +0200 Subject: [PATCH] mgr/cephadm: Add UpgradeState class My resons are: * This increase mypy coverage * Makes the code more discoverable. * Allowes us to have intelligent UpgradeState properties Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/upgrade.py | 131 ++++++++++++++++++++---------- 1 file changed, 86 insertions(+), 45 deletions(-) diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index 5fd031de5cf..9f3aea4f97a 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -18,24 +18,56 @@ CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] logger = logging.getLogger(__name__) + +class UpgradeState: + def __init__(self, + target_name: str, + progress_id: str, + target_id: Optional[str] = None, + target_version: Optional[str] = None, + error: Optional[str] = None, + paused: Optional[bool] = None, + ): + self.target_name: str = target_name + self.progress_id: str = progress_id + self.target_id: Optional[str] = target_id + self.target_version: Optional[str] = target_version + self.error: Optional[str] = error + self.paused: bool = paused or False + + def to_json(self) -> dict: + return { + 'target_name': self.target_name, + 'progress_id': self.progress_id, + 'target_id': self.target_id, + 'target_version': self.target_version, + 'error': self.error, + 'paused': self.paused, + } + + @classmethod + def from_json(cls, data) -> 'UpgradeState': + return cls(**data) + + class CephadmUpgrade: def __init__(self, mgr: "CephadmOrchestrator"): self.mgr = mgr t = self.mgr.get_store('upgrade_state') if t: - self.upgrade_state = json.loads(t) + self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t)) else: self.upgrade_state = None def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: r = orchestrator.UpgradeStatusSpec() if self.upgrade_state: - r.target_image = self.upgrade_state.get('target_name') + r.target_image = self.upgrade_state.target_name r.in_progress = True - if self.upgrade_state.get('error'): - r.message = 'Error: ' + self.upgrade_state.get('error') - elif self.upgrade_state.get('paused'): + if self.upgrade_state.error: + r.message = 'Error: ' + self.upgrade_state.error + elif self.upgrade_state.paused: r.message = 'Upgrade paused' return r @@ -58,19 +90,19 @@ class CephadmUpgrade: else: raise OrchestratorError('must specify either image or version') if self.upgrade_state: - if self.upgrade_state.get('target_name') != target_name: + if self.upgrade_state.target_name != target_name: raise OrchestratorError( 'Upgrade to %s (not %s) already in progress' % - (self.upgrade_state.get('target_name'), target_name)) - if self.upgrade_state.get('paused'): - del self.upgrade_state['paused'] + (self.upgrade_state.target_name, target_name)) + if self.upgrade_state.paused: + self.upgrade_state.paused = False self._save_upgrade_state() - return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') - return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name') - self.upgrade_state = { - 'target_name': target_name, - 'progress_id': str(uuid.uuid4()), - } + return 'Resumed upgrade to %s' % self.upgrade_state.target_name + return 'Upgrade to %s in progress' % self.upgrade_state.target_name + self.upgrade_state = UpgradeState( + target_name=target_name, + progress_id=str(uuid.uuid4()) + ) self._update_upgrade_progress(0.0) self._save_upgrade_state() self._clear_upgrade_health_checks() @@ -80,29 +112,29 @@ class CephadmUpgrade: def upgrade_pause(self) -> str: if not self.upgrade_state: raise OrchestratorError('No upgrade in progress') - if self.upgrade_state.get('paused'): - return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name') - self.upgrade_state['paused'] = True + if self.upgrade_state.paused: + return 'Upgrade to %s already paused' % self.upgrade_state.target_name + self.upgrade_state.paused = True self._save_upgrade_state() - return 'Paused upgrade to %s' % self.upgrade_state.get('target_name') + return 'Paused upgrade to %s' % self.upgrade_state.target_name def upgrade_resume(self) -> str: if not self.upgrade_state: raise OrchestratorError('No upgrade in progress') - if not self.upgrade_state.get('paused'): - return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name') - del self.upgrade_state['paused'] + if not self.upgrade_state.paused: + return 'Upgrade to %s not paused' % self.upgrade_state.target_name + self.upgrade_state.paused = False self._save_upgrade_state() self.mgr.event.set() - return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') + return 'Resumed upgrade to %s' % self.upgrade_state.target_name def upgrade_stop(self) -> str: if not self.upgrade_state: return 'No upgrade in progress' - target_name = self.upgrade_state.get('target_name') - if 'progress_id' in self.upgrade_state: + target_name = self.upgrade_state.target_name + if self.upgrade_state.progress_id: self.mgr.remote('progress', 'complete', - self.upgrade_state['progress_id']) + self.upgrade_state.progress_id) self.upgrade_state = None self._save_upgrade_state() self._clear_upgrade_health_checks() @@ -114,7 +146,7 @@ class CephadmUpgrade: Returns false, if nothing was done. :return: """ - if self.upgrade_state and not self.upgrade_state.get('paused'): + if self.upgrade_state and not self.upgrade_state.paused: self._do_upgrade() return True return False @@ -123,7 +155,7 @@ class CephadmUpgrade: # only wait a little bit; the service might go away for something tries = 4 while tries > 0: - if not self.upgrade_state or self.upgrade_state.get('paused'): + if not self.upgrade_state or self.upgrade_state.paused: return False r = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id]) @@ -147,22 +179,31 @@ class CephadmUpgrade: def _fail_upgrade(self, alert_id, alert) -> None: logger.error('Upgrade: Paused due to %s: %s' % (alert_id, alert['summary'])) - self.upgrade_state['error'] = alert_id + ': ' + alert['summary'] - self.upgrade_state['paused'] = True + if not self.upgrade_state: + assert False, 'No upgrade in progress' + + self.upgrade_state.error = alert_id + ': ' + alert['summary'] + self.upgrade_state.paused = True self._save_upgrade_state() self.mgr.health_checks[alert_id] = alert self.mgr.set_health_checks(self.mgr.health_checks) def _update_upgrade_progress(self, progress) -> None: - if 'progress_id' not in self.upgrade_state: - self.upgrade_state['progress_id'] = str(uuid.uuid4()) + if not self.upgrade_state: + assert False, 'No upgrade in progress' + + if not self.upgrade_state.progress_id: + self.upgrade_state.progress_id = str(uuid.uuid4()) self._save_upgrade_state() - self.mgr.remote('progress', 'update', self.upgrade_state['progress_id'], - ev_msg='Upgrade to %s' % self.upgrade_state['target_name'], + self.mgr.remote('progress', 'update', self.upgrade_state.progress_id, + ev_msg='Upgrade to %s' % self.upgrade_state.target_name, ev_progress=progress) def _save_upgrade_state(self) -> None: - self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state)) + if not self.upgrade_state: + self.mgr.set_store('upgrade_state', None) + return + self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json())) def _do_upgrade(self): # type: () -> None @@ -170,8 +211,8 @@ class CephadmUpgrade: logger.debug('_do_upgrade no state, exiting') return - target_name = self.upgrade_state.get('target_name') - target_id = self.upgrade_state.get('target_id', None) + target_name = self.upgrade_state.target_name + target_id = self.upgrade_state.target_id if not target_id: # need to learn the container hash logger.info('Upgrade: First pull of %s' % target_name) @@ -185,10 +226,10 @@ class CephadmUpgrade: 'detail': [str(e)], }) return - self.upgrade_state['target_id'] = target_id - self.upgrade_state['target_version'] = target_version + self.upgrade_state.target_id = target_id + self.upgrade_state.target_version = target_version self._save_upgrade_state() - target_version = self.upgrade_state.get('target_version') + target_version = self.upgrade_state.target_version logger.info('Upgrade: Target is %s with id %s' % (target_name, target_id)) @@ -250,7 +291,7 @@ class CephadmUpgrade: r = json.loads(''.join(out)) if r.get('image_id') != target_id: logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id)) - self.upgrade_state['target_id'] = r['image_id'] + self.upgrade_state.target_id = r['image_id'] self._save_upgrade_state() return @@ -310,10 +351,10 @@ class CephadmUpgrade: self.mgr.set_health_checks(self.mgr.health_checks) # make sure 'ceph versions' agrees - ret, out, err = self.mgr.check_mon_command({ + ret, out_ver, err = self.mgr.check_mon_command({ 'prefix': 'versions', }) - j = json.loads(out) + j = json.loads(out_ver) for version, count in j.get(daemon_type, {}).items(): if version != target_version: logger.warning( @@ -363,9 +404,9 @@ class CephadmUpgrade: }) logger.info('Upgrade: Complete!') - if 'progress_id' in self.upgrade_state: + if self.upgrade_state.progress_id: self.mgr.remote('progress', 'complete', - self.upgrade_state['progress_id']) + self.upgrade_state.progress_id) self.upgrade_state = None self._save_upgrade_state() return -- 2.39.5