return wrapper
return decorator
-class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
+class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
_STORE_HOST_PREFIX = "host"
else:
self.upgrade_state = None
+ self.health_checks = {}
+
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
# load inventory
self.event = Event()
def shutdown(self):
- self.log.error('shutdown')
+ self.log.info('shutdown')
self._worker_pool.close()
self._worker_pool.join()
self.run = False
self.log.debug('_kick_serve_loop')
self.event.set()
+ def _wait_for_ok_to_stop(self, s):
+ # only wait a little bit; the service might go away for something
+ tries = 4
+ while tries > 0:
+ if s.service_type not in ['mon', 'osd', 'mds']:
+ break
+ ret, out, err = self.mon_command({
+ 'prefix': '%s ok-to-stop',
+ 'ids': [s.service_instance],
+ })
+ if not self.upgrade_state or self.upgrade_state.get('paused'):
+ return False
+ if err:
+ self.log.info('Upgrade: not safe to stop %s.%s' %
+ (s.service_type, s.service_instance))
+ time.sleep(15)
+ tries -= 1
+ else:
+ self.log.info('Upgrade: safe to stop %s.%s' %
+ (s.service_type, s.service_instance))
+ return True
+
+ def _clear_health_checks(self):
+ self.health_checks = {}
+ self.set_health_checks(self.health_checks)
+
+ def _do_upgrade(self, daemons):
+ # type: (List[orchestrator.ServiceDescription]) -> Optional[AsyncCompletion]
+ if not self.upgrade_state:
+ self.log.debug('_do_upgrade no state, exiting')
+ return None
+
+ target_name = self.upgrade_state.get('target_name')
+ target_id = self.upgrade_state.get('target_id')
+ target_version = self.upgrade_state.get('target_version')
+ self.log.info('Upgrade: Target is %s with id %s' % (target_name,
+ target_id))
+
+ # get all distinct container_image settings
+ image_settings = {}
+ ret, out, err = self.mon_command({
+ 'prefix': 'config dump',
+ 'format': 'json',
+ })
+ config = json.loads(out)
+ for opt in config:
+ if opt['name'] == 'container_image':
+ image_settings[opt['section']] = opt['value']
+
+ for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']:
+ self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
+ need_upgrade_self = False
+ for d in daemons:
+ if d.service_type != daemon_type:
+ continue
+ if daemon_type == 'mgr' and \
+ d.service_instance == self.get_mgr_id():
+ self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
+ self.get_mgr_id())
+ need_upgrade_self = True
+ continue
+ if d.container_image_id != target_id:
+ if not self._wait_for_ok_to_stop(d):
+ return None
+ self.log.info('Upgrade: Redeploying %s.%s' %
+ (d.service_type, d.service_instance))
+ ret, out, err = self.mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': target_name,
+ 'who': daemon_type + '.' + d.service_instance,
+ })
+ return self._service_action([(
+ d.service_type,
+ d.service_instance,
+ d.nodename,
+ 'redeploy'
+ )])
+
+ if need_upgrade_self:
+ mgr_map = self.get('mgr_map')
+ num = len(mgr_map.get('standbys'))
+ if not num:
+ self.log.warning(
+ 'Upgrade: No standby mgrs and I need to update the mgr, '
+ 'suspending upgrade')
+ self.upgrade_state['error'] = 'No standby mgrs and mgr.%s ' \
+ 'needs to be upgraded' % self.get_mgr_id()
+ self.upgrade_state['paused'] = True
+ self._save_upgrade_state()
+ self.health_checks['UPGRADE_NO_STANDBY_MGR'] = {
+ 'severity': 'warning',
+ 'summary': 'Upgrade: Need standby mgr daemon',
+ 'count': 1,
+ 'detail': [
+ 'The upgrade process needs to upgrade the mgr, '
+ 'but it needs at least one standby to proceed.',
+ ],
+ }
+ self.set_health_checks(self.health_checks)
+ return None
+
+ self.log.info('Upgrade: there are %d other already-upgraded '
+ 'standby mgrs, failing over' % num)
+
+ # fail over
+ ret, out, err = self.mon_command({
+ 'prefix': 'mgr fail',
+ 'who': self.get_mgr_id(),
+ })
+ return None
+ elif daemon_type == 'mgr':
+ if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks:
+ del self.health_checks['UPGRADE_NO_STANDBY_MGR']
+ self.set_health_checks(self.health_checks)
+
+ # make sure 'ceph versions' agrees
+ ret, out, err = self.mon_command({
+ 'prefix': 'versions',
+ })
+ j = json.loads(out)
+ self.log.debug('j %s' % j)
+ for version, count in j.get(daemon_type, {}).items():
+ if version != target_version:
+ self.log.warning(
+ 'Upgrade: %d %s daemon(s) are %s != target %s' %
+ (count, daemon_type, version, target_version))
+
+ # push down configs
+ if image_settings.get(daemon_type) != target_name:
+ self.log.info('Upgrade: Setting container_image for all %s...' %
+ daemon_type)
+ ret, out, err = self.mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': target_name,
+ 'who': daemon_type,
+ })
+ to_clean = []
+ for section in image_settings.keys():
+ if section.startswith(daemon_type + '.'):
+ to_clean.append(section)
+ if to_clean:
+ self.log.info('Upgrade: Cleaning up container_image for %s...' %
+ to_clean)
+ for section in to_clean:
+ ret, image, err = self.mon_command({
+ 'prefix': 'config rm',
+ 'name': 'container_image',
+ 'who': section,
+ })
+ self.log.info('Upgrade: All %s daemons are up to date.' %
+ daemon_type)
+
+ # clean up
+ self.log.info('Upgrade: Finalizing container_image settings')
+ ret, out, err = self.mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': target_name,
+ 'who': 'global',
+ })
+ for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']:
+ ret, image, err = self.mon_command({
+ 'prefix': 'config rm',
+ 'name': 'container_image',
+ 'who': daemon_type,
+ })
+
+ self.log.info('Upgrade: Complete!')
+ self.upgrade_state = None
+ self._save_upgrade_state()
+ return None
+
def serve(self):
# type: () -> None
self.log.info("serve starting")
+ orch_client = orchestrator.OrchestratorClientMixin()
+ orch_client.set_mgr(self.mgr)
while self.run:
+ while self.upgrade_state and not self.upgrade_state.get('paused'):
+ self.log.debug('Upgrade in progress, refreshing services')
+ completion = self._get_services()
+ orch_client._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ self.log.debug('services %s' % completion.result)
+ completion = self._do_upgrade(completion.result)
+ if completion:
+ while not completion.has_result:
+ self.process([completion])
+ if completion.needs_result:
+ time.sleep(1)
+ else:
+ break
+ orchestrator.raise_if_exception(completion)
+ self.log.debug('did _do_upgrade')
+
sleep_interval = 60*60 # this really doesn't matter
self.log.debug('Sleeping for %d seconds', sleep_interval)
ret = self.event.wait(sleep_interval)
out, err, code = self._run_cephadm(
host, 'mon', 'ls', [], no_fsid=True)
data = json.loads(''.join(out))
- self.log.error('refreshed host %s services: %s' % (host, data))
+ self.log.debug('Refreshed host %s services: %s' % (host, data))
self.service_cache[host] = orchestrator.OutdatableData(data)
return host, data
if self.upgrade_state:
r.target_image = self.upgrade_state.get('target_name')
r.in_progress = True
- r.services_complete = ['foo', 'bar']
- if self.upgrade_state.get('paused'):
+ if self.upgrade_state.get('error'):
+ r.message = 'Error: ' + self.upgrade_state.get('error')
+ elif self.upgrade_state.get('paused'):
r.message = 'Upgrade paused'
return trivial_result(r)
'target_version': target_version,
}
self._save_upgrade_state()
+ self._clear_health_checks()
self.event.set()
return trivial_result('Initiating upgrade to %s %s' % (image, target_id))
self.upgrade_state.get('target_name'))
del self.upgrade_state['paused']
self._save_upgrade_state()
+ self.event.set()
return trivial_result('Resumed upgrade to %s' %
self.upgrade_state.get('target_name'))
target_name = self.upgrade_state.get('target_name')
self.upgrade_state = None
self._save_upgrade_state()
+ self._clear_health_checks()
+ self.event.set()
return trivial_result('Stopped upgrade to %s' % target_name)