]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: upgrade start/stop/pause/resume
authorSage Weil <sage@redhat.com>
Fri, 13 Dec 2019 22:29:07 +0000 (16:29 -0600)
committerSage Weil <sage@redhat.com>
Fri, 17 Jan 2020 18:22:02 +0000 (12:22 -0600)
No actual upgrading yet.

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py

index 79a0df1e4ef463ff29b0ab7b2a7e8812158fddea..ddf88133d3d2ccc0d4f055eb6a0278c02bd667bd 100644 (file)
@@ -1,6 +1,8 @@
 import json
 import errno
 import logging
+import time
+from threading import Event
 from functools import wraps
 
 import string
@@ -322,6 +324,13 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._reconfig_ssh()
 
         CephadmOrchestrator.instance = self
+
+        t = self.get_store('upgrade_state')
+        if t:
+            self.upgrade_state = json.loads(t)
+        else:
+            self.upgrade_state = None
+
         self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
         # load inventory
@@ -357,10 +366,30 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
             if h not in self.inventory:
                 del self.service_cache[h]
 
+        # for serve()
+        self.run = True
+        self.event = Event()
+
     def shutdown(self):
         self.log.error('shutdown')
         self._worker_pool.close()
         self._worker_pool.join()
+        self.run = False
+        self.event.set()
+
+    def _kick_serve_loop(self):
+        self.log.debug('_kick_serve_loop')
+        self.event.set()
+
+    def serve(self):
+        # type: () -> None
+        self.log.info("serve starting")
+        while self.run:
+            sleep_interval = 60*60  # this really doesn't matter
+            self.log.debug('Sleeping for %d seconds', sleep_interval)
+            ret = self.event.wait(sleep_interval)
+            self.event.clear()
+        self.log.info("serve exit")
 
     def config_notify(self):
         """
@@ -402,6 +431,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
     def _save_inventory(self):
         self.set_store('inventory', json.dumps(self.inventory))
 
+    def _save_upgrade_state(self):
+        self.set_store('upgrade_state', json.dumps(self.upgrade_state))
+
     def _reconfig_ssh(self):
         temp_files = []  # type: list
         ssh_options = []  # type: List[str]
@@ -1535,6 +1567,76 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         return json.dumps(r, indent=4, sort_keys=True)
 
 
+    def upgrade_status(self):
+        r = orchestrator.UpgradeStatusSpec()
+        if self.upgrade_state:
+            r.target_image = self.upgrade_state.get('target_name')
+            r.in_progress = True
+            r.services_complete = ['foo', 'bar']
+            if self.upgrade_state.get('paused'):
+                r.message = 'Upgrade paused'
+        return trivial_result(r)
+
+    def upgrade_start(self, image, version):
+        if version:
+            target_name = self.container_image_base + ':v' + version
+        elif image:
+            target_name = image
+        else:
+            raise OrchestratorError('must specify either image or version')
+        if self.upgrade_state:
+            if self.upgrade_state.get('target_name') != target_name:
+                raise OrchestratorError(
+                    'Upgrade to %s (not %s) already in progress' %
+                (self.upgrade_state.get('target_name'), target_name))
+            if self.upgrade_state.get('paused'):
+                del self.upgrade_state['paused']
+                self._save_upgrade_state()
+                return trivial_result('Resumed upgrade to %s' %
+                                      self.upgrade_state.get('target_name'))
+            return trivial_result('Upgrade to %s in progress' %
+                                  self.upgrade_state.get('target_name'))
+        target_id, target_version = self._get_container_image_id(target_name)
+        self.upgrade_state = {
+            'target_name': target_name,
+            'target_id': target_id,
+            'target_version': target_version,
+        }
+        self._save_upgrade_state()
+        self.event.set()
+        return trivial_result('Initiating upgrade to %s %s' % (image, target_id))
+
+    def upgrade_pause(self):
+        if not self.upgrade_state:
+            raise OrchestratorError('No upgrade in progress')
+        if self.upgrade_state.get('paused'):
+            return trivial_result('Upgrade to %s already paused' %
+                                  self.upgrade_state.get('target_name'))
+        self.upgrade_state['paused'] = True
+        self._save_upgrade_state()
+        return trivial_result('Paused upgrade to %s' %
+                              self.upgrade_state.get('target_name'))
+
+    def upgrade_resume(self):
+        if not self.upgrade_state:
+            raise OrchestratorError('No upgrade in progress')
+        if not self.upgrade_state.get('paused'):
+            return trivial_result('Upgrade to %s not paused' %
+                                  self.upgrade_state.get('target_name'))
+        del self.upgrade_state['paused']
+        self._save_upgrade_state()
+        return trivial_result('Resumed upgrade to %s' %
+                              self.upgrade_state.get('target_name'))
+
+    def upgrade_stop(self):
+        if not self.upgrade_state:
+            return trivial_result('No upgrade in progress')
+        target_name = self.upgrade_state.get('target_name')
+        self.upgrade_state = None
+        self._save_upgrade_state()
+        return trivial_result('Stopped upgrade to %s' % target_name)
+
+
 class BaseScheduler(object):
     """
     Base Scheduler Interface