From: Sage Weil Date: Fri, 13 Dec 2019 22:29:07 +0000 (-0600) Subject: mgr/cephadm: upgrade start/stop/pause/resume X-Git-Tag: v15.1.0~171^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=8fcbc1481f793321161aa52e8ea28a49bad1c0b2;p=ceph-ci.git mgr/cephadm: upgrade start/stop/pause/resume No actual upgrading yet. Signed-off-by: Sage Weil --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 79a0df1e4ef..ddf88133d3d 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -1,6 +1,8 @@ import json import errno import logging +import time +from threading import Event from functools import wraps import string @@ -322,6 +324,13 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): self._reconfig_ssh() CephadmOrchestrator.instance = self + + t = self.get_store('upgrade_state') + if t: + self.upgrade_state = json.loads(t) + else: + self.upgrade_state = None + self.all_progress_references = list() # type: List[orchestrator.ProgressReference] # load inventory @@ -357,10 +366,30 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): if h not in self.inventory: del self.service_cache[h] + # for serve() + self.run = True + self.event = Event() + def shutdown(self): self.log.error('shutdown') self._worker_pool.close() self._worker_pool.join() + self.run = False + self.event.set() + + def _kick_serve_loop(self): + self.log.debug('_kick_serve_loop') + self.event.set() + + def serve(self): + # type: () -> None + self.log.info("serve starting") + while self.run: + sleep_interval = 60*60 # this really doesn't matter + self.log.debug('Sleeping for %d seconds', sleep_interval) + ret = self.event.wait(sleep_interval) + self.event.clear() + self.log.info("serve exit") def config_notify(self): """ @@ -402,6 +431,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): def _save_inventory(self): self.set_store('inventory', json.dumps(self.inventory)) + def _save_upgrade_state(self): + self.set_store('upgrade_state', json.dumps(self.upgrade_state)) + def _reconfig_ssh(self): temp_files = [] # type: list ssh_options = [] # type: List[str] @@ -1535,6 +1567,76 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): return json.dumps(r, indent=4, sort_keys=True) + def upgrade_status(self): + r = orchestrator.UpgradeStatusSpec() + if self.upgrade_state: + r.target_image = self.upgrade_state.get('target_name') + r.in_progress = True + r.services_complete = ['foo', 'bar'] + if self.upgrade_state.get('paused'): + r.message = 'Upgrade paused' + return trivial_result(r) + + def upgrade_start(self, image, version): + if version: + target_name = self.container_image_base + ':v' + version + elif image: + target_name = image + else: + raise OrchestratorError('must specify either image or version') + if self.upgrade_state: + if self.upgrade_state.get('target_name') != target_name: + raise OrchestratorError( + 'Upgrade to %s (not %s) already in progress' % + (self.upgrade_state.get('target_name'), target_name)) + if self.upgrade_state.get('paused'): + del self.upgrade_state['paused'] + self._save_upgrade_state() + return trivial_result('Resumed upgrade to %s' % + self.upgrade_state.get('target_name')) + return trivial_result('Upgrade to %s in progress' % + self.upgrade_state.get('target_name')) + target_id, target_version = self._get_container_image_id(target_name) + self.upgrade_state = { + 'target_name': target_name, + 'target_id': target_id, + 'target_version': target_version, + } + self._save_upgrade_state() + self.event.set() + return trivial_result('Initiating upgrade to %s %s' % (image, target_id)) + + def upgrade_pause(self): + if not self.upgrade_state: + raise OrchestratorError('No upgrade in progress') + if self.upgrade_state.get('paused'): + return trivial_result('Upgrade to %s already paused' % + self.upgrade_state.get('target_name')) + self.upgrade_state['paused'] = True + self._save_upgrade_state() + return trivial_result('Paused upgrade to %s' % + self.upgrade_state.get('target_name')) + + def upgrade_resume(self): + if not self.upgrade_state: + raise OrchestratorError('No upgrade in progress') + if not self.upgrade_state.get('paused'): + return trivial_result('Upgrade to %s not paused' % + self.upgrade_state.get('target_name')) + del self.upgrade_state['paused'] + self._save_upgrade_state() + return trivial_result('Resumed upgrade to %s' % + self.upgrade_state.get('target_name')) + + def upgrade_stop(self): + if not self.upgrade_state: + return trivial_result('No upgrade in progress') + target_name = self.upgrade_state.get('target_name') + self.upgrade_state = None + self._save_upgrade_state() + return trivial_result('Stopped upgrade to %s' % target_name) + + class BaseScheduler(object): """ Base Scheduler Interface