From: Sage Weil Date: Fri, 13 Dec 2019 22:30:06 +0000 (-0600) Subject: mgr/cephadm: initial upgrade logic X-Git-Tag: v15.1.0~171^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=446cdf74f7ae59ea33427bb6fb2bd9ab24c1992b;p=ceph.git mgr/cephadm: initial upgrade logic - upgrade mgr, mon, osd, mds - use ok-to-stop Signed-off-by: Sage Weil --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index c6be640aadcbf..0cc405ba37e4e 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -264,7 +264,7 @@ def with_services(service_type=None, return wrapper return decorator -class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): +class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): _STORE_HOST_PREFIX = "host" @@ -331,6 +331,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): else: self.upgrade_state = None + self.health_checks = {} + self.all_progress_references = list() # type: List[orchestrator.ProgressReference] # load inventory @@ -371,7 +373,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): self.event = Event() def shutdown(self): - self.log.error('shutdown') + self.log.info('shutdown') self._worker_pool.close() self._worker_pool.join() self.run = False @@ -381,10 +383,203 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): self.log.debug('_kick_serve_loop') self.event.set() + def _wait_for_ok_to_stop(self, s): + # only wait a little bit; the service might go away for something + tries = 4 + while tries > 0: + if s.service_type not in ['mon', 'osd', 'mds']: + break + ret, out, err = self.mon_command({ + 'prefix': '%s ok-to-stop', + 'ids': [s.service_instance], + }) + if not self.upgrade_state or self.upgrade_state.get('paused'): + return False + if err: + self.log.info('Upgrade: not safe to stop %s.%s' % + (s.service_type, s.service_instance)) + time.sleep(15) + tries -= 1 + else: + self.log.info('Upgrade: safe to stop %s.%s' % + (s.service_type, s.service_instance)) + return True + + def _clear_health_checks(self): + self.health_checks = {} + self.set_health_checks(self.health_checks) + + def _do_upgrade(self, daemons): + # type: (List[orchestrator.ServiceDescription]) -> Optional[AsyncCompletion] + if not self.upgrade_state: + self.log.debug('_do_upgrade no state, exiting') + return None + + target_name = self.upgrade_state.get('target_name') + target_id = self.upgrade_state.get('target_id') + target_version = self.upgrade_state.get('target_version') + self.log.info('Upgrade: Target is %s with id %s' % (target_name, + target_id)) + + # get all distinct container_image settings + image_settings = {} + ret, out, err = self.mon_command({ + 'prefix': 'config dump', + 'format': 'json', + }) + config = json.loads(out) + for opt in config: + if opt['name'] == 'container_image': + image_settings[opt['section']] = opt['value'] + + for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']: + self.log.info('Upgrade: Checking %s daemons...' % daemon_type) + need_upgrade_self = False + for d in daemons: + if d.service_type != daemon_type: + continue + if daemon_type == 'mgr' and \ + d.service_instance == self.get_mgr_id(): + self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' % + self.get_mgr_id()) + need_upgrade_self = True + continue + if d.container_image_id != target_id: + if not self._wait_for_ok_to_stop(d): + return None + self.log.info('Upgrade: Redeploying %s.%s' % + (d.service_type, d.service_instance)) + ret, out, err = self.mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': target_name, + 'who': daemon_type + '.' + d.service_instance, + }) + return self._service_action([( + d.service_type, + d.service_instance, + d.nodename, + 'redeploy' + )]) + + if need_upgrade_self: + mgr_map = self.get('mgr_map') + num = len(mgr_map.get('standbys')) + if not num: + self.log.warning( + 'Upgrade: No standby mgrs and I need to update the mgr, ' + 'suspending upgrade') + self.upgrade_state['error'] = 'No standby mgrs and mgr.%s ' \ + 'needs to be upgraded' % self.get_mgr_id() + self.upgrade_state['paused'] = True + self._save_upgrade_state() + self.health_checks['UPGRADE_NO_STANDBY_MGR'] = { + 'severity': 'warning', + 'summary': 'Upgrade: Need standby mgr daemon', + 'count': 1, + 'detail': [ + 'The upgrade process needs to upgrade the mgr, ' + 'but it needs at least one standby to proceed.', + ], + } + self.set_health_checks(self.health_checks) + return None + + self.log.info('Upgrade: there are %d other already-upgraded ' + 'standby mgrs, failing over' % num) + + # fail over + ret, out, err = self.mon_command({ + 'prefix': 'mgr fail', + 'who': self.get_mgr_id(), + }) + return None + elif daemon_type == 'mgr': + if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks: + del self.health_checks['UPGRADE_NO_STANDBY_MGR'] + self.set_health_checks(self.health_checks) + + # make sure 'ceph versions' agrees + ret, out, err = self.mon_command({ + 'prefix': 'versions', + }) + j = json.loads(out) + self.log.debug('j %s' % j) + for version, count in j.get(daemon_type, {}).items(): + if version != target_version: + self.log.warning( + 'Upgrade: %d %s daemon(s) are %s != target %s' % + (count, daemon_type, version, target_version)) + + # push down configs + if image_settings.get(daemon_type) != target_name: + self.log.info('Upgrade: Setting container_image for all %s...' % + daemon_type) + ret, out, err = self.mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': target_name, + 'who': daemon_type, + }) + to_clean = [] + for section in image_settings.keys(): + if section.startswith(daemon_type + '.'): + to_clean.append(section) + if to_clean: + self.log.info('Upgrade: Cleaning up container_image for %s...' % + to_clean) + for section in to_clean: + ret, image, err = self.mon_command({ + 'prefix': 'config rm', + 'name': 'container_image', + 'who': section, + }) + self.log.info('Upgrade: All %s daemons are up to date.' % + daemon_type) + + # clean up + self.log.info('Upgrade: Finalizing container_image settings') + ret, out, err = self.mon_command({ + 'prefix': 'config set', + 'name': 'container_image', + 'value': target_name, + 'who': 'global', + }) + for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']: + ret, image, err = self.mon_command({ + 'prefix': 'config rm', + 'name': 'container_image', + 'who': daemon_type, + }) + + self.log.info('Upgrade: Complete!') + self.upgrade_state = None + self._save_upgrade_state() + return None + def serve(self): # type: () -> None self.log.info("serve starting") + orch_client = orchestrator.OrchestratorClientMixin() + orch_client.set_mgr(self.mgr) while self.run: + while self.upgrade_state and not self.upgrade_state.get('paused'): + self.log.debug('Upgrade in progress, refreshing services') + completion = self._get_services() + orch_client._orchestrator_wait([completion]) + orchestrator.raise_if_exception(completion) + self.log.debug('services %s' % completion.result) + completion = self._do_upgrade(completion.result) + if completion: + while not completion.has_result: + self.process([completion]) + if completion.needs_result: + time.sleep(1) + else: + break + orchestrator.raise_if_exception(completion) + self.log.debug('did _do_upgrade') + sleep_interval = 60*60 # this really doesn't matter self.log.debug('Sleeping for %d seconds', sleep_interval) ret = self.event.wait(sleep_interval) @@ -788,7 +983,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): out, err, code = self._run_cephadm( host, 'mon', 'ls', [], no_fsid=True) data = json.loads(''.join(out)) - self.log.error('refreshed host %s services: %s' % (host, data)) + self.log.debug('Refreshed host %s services: %s' % (host, data)) self.service_cache[host] = orchestrator.OutdatableData(data) return host, data @@ -1574,8 +1769,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): if self.upgrade_state: r.target_image = self.upgrade_state.get('target_name') r.in_progress = True - r.services_complete = ['foo', 'bar'] - if self.upgrade_state.get('paused'): + if self.upgrade_state.get('error'): + r.message = 'Error: ' + self.upgrade_state.get('error') + elif self.upgrade_state.get('paused'): r.message = 'Upgrade paused' return trivial_result(r) @@ -1605,6 +1801,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): 'target_version': target_version, } self._save_upgrade_state() + self._clear_health_checks() self.event.set() return trivial_result('Initiating upgrade to %s %s' % (image, target_id)) @@ -1627,6 +1824,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): self.upgrade_state.get('target_name')) del self.upgrade_state['paused'] self._save_upgrade_state() + self.event.set() return trivial_result('Resumed upgrade to %s' % self.upgrade_state.get('target_name')) @@ -1636,6 +1834,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator): target_name = self.upgrade_state.get('target_name') self.upgrade_state = None self._save_upgrade_state() + self._clear_health_checks() + self.event.set() return trivial_result('Stopped upgrade to %s' % target_name) diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 052eb87ba6588..ac508256cf23b 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -18,7 +18,7 @@ def set_store(self, k, v): def get_store(self, k): - return self._store[k] + return self._store.get(k, None) def get_store_prefix(self, prefix): @@ -46,6 +46,7 @@ def cephadm_module(): 'ssh_identity_key': '', 'ssh_identity_pub': '', 'inventory': {}, + 'upgrade_state': None, } m.__init__('cephadm', 0, 0) yield m