]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: initial upgrade logic
authorSage Weil <sage@redhat.com>
Fri, 13 Dec 2019 22:30:06 +0000 (16:30 -0600)
committerSage Weil <sage@redhat.com>
Fri, 17 Jan 2020 18:22:02 +0000 (12:22 -0600)
- upgrade mgr, mon, osd, mds
- use ok-to-stop

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/fixtures.py

index c6be640aadcbfe0c193c75d570afae883a83b6cf..0cc405ba37e4e87e4ece41576446528aa80bb4dd 100644 (file)
@@ -264,7 +264,7 @@ def with_services(service_type=None,
         return wrapper
     return decorator
 
-class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
+class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
     _STORE_HOST_PREFIX = "host"
 
@@ -331,6 +331,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         else:
             self.upgrade_state = None
 
+        self.health_checks = {}
+
         self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
         # load inventory
@@ -371,7 +373,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         self.event = Event()
 
     def shutdown(self):
-        self.log.error('shutdown')
+        self.log.info('shutdown')
         self._worker_pool.close()
         self._worker_pool.join()
         self.run = False
@@ -381,10 +383,203 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         self.log.debug('_kick_serve_loop')
         self.event.set()
 
+    def _wait_for_ok_to_stop(self, s):
+        # only wait a little bit; the service might go away for something
+        tries = 4
+        while tries > 0:
+            if s.service_type not in ['mon', 'osd', 'mds']:
+                break
+            ret, out, err = self.mon_command({
+                'prefix': '%s ok-to-stop',
+                'ids': [s.service_instance],
+            })
+            if not self.upgrade_state or self.upgrade_state.get('paused'):
+                return False
+            if err:
+                self.log.info('Upgrade: not safe to stop %s.%s' %
+                              (s.service_type, s.service_instance))
+                time.sleep(15)
+                tries -= 1
+            else:
+                self.log.info('Upgrade: safe to stop %s.%s' %
+                              (s.service_type, s.service_instance))
+                return True
+
+    def _clear_health_checks(self):
+        self.health_checks = {}
+        self.set_health_checks(self.health_checks)
+
+    def _do_upgrade(self, daemons):
+        # type: (List[orchestrator.ServiceDescription]) -> Optional[AsyncCompletion]
+        if not self.upgrade_state:
+            self.log.debug('_do_upgrade no state, exiting')
+            return None
+
+        target_name = self.upgrade_state.get('target_name')
+        target_id = self.upgrade_state.get('target_id')
+        target_version = self.upgrade_state.get('target_version')
+        self.log.info('Upgrade: Target is %s with id %s' % (target_name,
+                                                            target_id))
+
+        # get all distinct container_image settings
+        image_settings = {}
+        ret, out, err = self.mon_command({
+            'prefix': 'config dump',
+            'format': 'json',
+        })
+        config = json.loads(out)
+        for opt in config:
+            if opt['name'] == 'container_image':
+                image_settings[opt['section']] = opt['value']
+
+        for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']:
+            self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
+            need_upgrade_self = False
+            for d in daemons:
+                if d.service_type != daemon_type:
+                    continue
+                if daemon_type == 'mgr' and \
+                   d.service_instance == self.get_mgr_id():
+                    self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
+                                  self.get_mgr_id())
+                    need_upgrade_self = True
+                    continue
+                if d.container_image_id != target_id:
+                    if not self._wait_for_ok_to_stop(d):
+                        return None
+                    self.log.info('Upgrade: Redeploying %s.%s' %
+                                  (d.service_type, d.service_instance))
+                    ret, out, err = self.mon_command({
+                        'prefix': 'config set',
+                        'name': 'container_image',
+                        'value': target_name,
+                        'who': daemon_type + '.' + d.service_instance,
+                    })
+                    return self._service_action([(
+                        d.service_type,
+                        d.service_instance,
+                        d.nodename,
+                        'redeploy'
+                    )])
+
+            if need_upgrade_self:
+                mgr_map = self.get('mgr_map')
+                num = len(mgr_map.get('standbys'))
+                if not num:
+                    self.log.warning(
+                        'Upgrade: No standby mgrs and I need to update the mgr, '
+                        'suspending upgrade')
+                    self.upgrade_state['error'] = 'No standby mgrs and mgr.%s ' \
+                        'needs to be upgraded' % self.get_mgr_id()
+                    self.upgrade_state['paused'] = True
+                    self._save_upgrade_state()
+                    self.health_checks['UPGRADE_NO_STANDBY_MGR'] = {
+                        'severity': 'warning',
+                        'summary': 'Upgrade: Need standby mgr daemon',
+                        'count': 1,
+                        'detail': [
+                            'The upgrade process needs to upgrade the mgr, '
+                            'but it needs at least one standby to proceed.',
+                        ],
+                    }
+                    self.set_health_checks(self.health_checks)
+                    return None
+
+                self.log.info('Upgrade: there are %d other already-upgraded '
+                              'standby mgrs, failing over' % num)
+
+                # fail over
+                ret, out, err = self.mon_command({
+                    'prefix': 'mgr fail',
+                    'who': self.get_mgr_id(),
+                })
+                return None
+            elif daemon_type == 'mgr':
+                if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks:
+                    del self.health_checks['UPGRADE_NO_STANDBY_MGR']
+                    self.set_health_checks(self.health_checks)
+
+            # make sure 'ceph versions' agrees
+            ret, out, err = self.mon_command({
+                'prefix': 'versions',
+            })
+            j = json.loads(out)
+            self.log.debug('j %s' % j)
+            for version, count in j.get(daemon_type, {}).items():
+                if version != target_version:
+                    self.log.warning(
+                        'Upgrade: %d %s daemon(s) are %s != target %s' %
+                        (count, daemon_type, version, target_version))
+
+            # push down configs
+            if image_settings.get(daemon_type) != target_name:
+                self.log.info('Upgrade: Setting container_image for all %s...' %
+                              daemon_type)
+                ret, out, err = self.mon_command({
+                    'prefix': 'config set',
+                    'name': 'container_image',
+                    'value': target_name,
+                    'who': daemon_type,
+                })
+            to_clean = []
+            for section in image_settings.keys():
+                if section.startswith(daemon_type + '.'):
+                    to_clean.append(section)
+            if to_clean:
+                self.log.info('Upgrade: Cleaning up container_image for %s...' %
+                              to_clean)
+                for section in to_clean:
+                    ret, image, err = self.mon_command({
+                        'prefix': 'config rm',
+                        'name': 'container_image',
+                        'who': section,
+                    })
+            self.log.info('Upgrade: All %s daemons are up to date.' %
+                          daemon_type)
+
+        # clean up
+        self.log.info('Upgrade: Finalizing container_image settings')
+        ret, out, err = self.mon_command({
+            'prefix': 'config set',
+            'name': 'container_image',
+            'value': target_name,
+            'who': 'global',
+        })
+        for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']:
+            ret, image, err = self.mon_command({
+                'prefix': 'config rm',
+                'name': 'container_image',
+                'who': daemon_type,
+            })
+
+        self.log.info('Upgrade: Complete!')
+        self.upgrade_state = None
+        self._save_upgrade_state()
+        return None
+
     def serve(self):
         # type: () -> None
         self.log.info("serve starting")
+        orch_client = orchestrator.OrchestratorClientMixin()
+        orch_client.set_mgr(self.mgr)
         while self.run:
+            while self.upgrade_state and not self.upgrade_state.get('paused'):
+                self.log.debug('Upgrade in progress, refreshing services')
+                completion = self._get_services()
+                orch_client._orchestrator_wait([completion])
+                orchestrator.raise_if_exception(completion)
+                self.log.debug('services %s' % completion.result)
+                completion = self._do_upgrade(completion.result)
+                if completion:
+                    while not completion.has_result:
+                        self.process([completion])
+                        if completion.needs_result:
+                            time.sleep(1)
+                        else:
+                            break
+                    orchestrator.raise_if_exception(completion)
+                self.log.debug('did _do_upgrade')
+
             sleep_interval = 60*60  # this really doesn't matter
             self.log.debug('Sleeping for %d seconds', sleep_interval)
             ret = self.event.wait(sleep_interval)
@@ -788,7 +983,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         out, err, code = self._run_cephadm(
             host, 'mon', 'ls', [], no_fsid=True)
         data = json.loads(''.join(out))
-        self.log.error('refreshed host %s services: %s' % (host, data))
+        self.log.debug('Refreshed host %s services: %s' % (host, data))
         self.service_cache[host] = orchestrator.OutdatableData(data)
         return host, data
 
@@ -1574,8 +1769,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         if self.upgrade_state:
             r.target_image = self.upgrade_state.get('target_name')
             r.in_progress = True
-            r.services_complete = ['foo', 'bar']
-            if self.upgrade_state.get('paused'):
+            if self.upgrade_state.get('error'):
+                r.message = 'Error: ' + self.upgrade_state.get('error')
+            elif self.upgrade_state.get('paused'):
                 r.message = 'Upgrade paused'
         return trivial_result(r)
 
@@ -1605,6 +1801,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
             'target_version': target_version,
         }
         self._save_upgrade_state()
+        self._clear_health_checks()
         self.event.set()
         return trivial_result('Initiating upgrade to %s %s' % (image, target_id))
 
@@ -1627,6 +1824,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
                                   self.upgrade_state.get('target_name'))
         del self.upgrade_state['paused']
         self._save_upgrade_state()
+        self.event.set()
         return trivial_result('Resumed upgrade to %s' %
                               self.upgrade_state.get('target_name'))
 
@@ -1636,6 +1834,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
         target_name = self.upgrade_state.get('target_name')
         self.upgrade_state = None
         self._save_upgrade_state()
+        self._clear_health_checks()
+        self.event.set()
         return trivial_result('Stopped upgrade to %s' % target_name)
 
 
index 052eb87ba658832f5d7d94c1d359f69b59cd8234..ac508256cf23babf2d0b3c4c19d26df67835303e 100644 (file)
@@ -18,7 +18,7 @@ def set_store(self, k, v):
 
 
 def get_store(self, k):
-    return self._store[k]
+    return self._store.get(k, None)
 
 
 def get_store_prefix(self, prefix):
@@ -46,6 +46,7 @@ def cephadm_module():
             'ssh_identity_key': '',
             'ssh_identity_pub': '',
             'inventory': {},
+            'upgrade_state': None,
         }
         m.__init__('cephadm', 0, 0)
         yield m