import json
import errno
import logging
+import time
+from threading import Event
from functools import wraps
import string
self._reconfig_ssh()
CephadmOrchestrator.instance = self
+
+ t = self.get_store('upgrade_state')
+ if t:
+ self.upgrade_state = json.loads(t)
+ else:
+ self.upgrade_state = None
+
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
# load inventory
if h not in self.inventory:
del self.service_cache[h]
+ # for serve()
+ self.run = True
+ self.event = Event()
+
def shutdown(self):
self.log.error('shutdown')
self._worker_pool.close()
self._worker_pool.join()
+ self.run = False
+ self.event.set()
+
+ def _kick_serve_loop(self):
+ self.log.debug('_kick_serve_loop')
+ self.event.set()
+
+ def serve(self):
+ # type: () -> None
+ self.log.info("serve starting")
+ while self.run:
+ sleep_interval = 60*60 # this really doesn't matter
+ self.log.debug('Sleeping for %d seconds', sleep_interval)
+ ret = self.event.wait(sleep_interval)
+ self.event.clear()
+ self.log.info("serve exit")
def config_notify(self):
"""
def _save_inventory(self):
self.set_store('inventory', json.dumps(self.inventory))
+ def _save_upgrade_state(self):
+ self.set_store('upgrade_state', json.dumps(self.upgrade_state))
+
def _reconfig_ssh(self):
temp_files = [] # type: list
ssh_options = [] # type: List[str]
return json.dumps(r, indent=4, sort_keys=True)
+ def upgrade_status(self):
+ r = orchestrator.UpgradeStatusSpec()
+ if self.upgrade_state:
+ r.target_image = self.upgrade_state.get('target_name')
+ r.in_progress = True
+ r.services_complete = ['foo', 'bar']
+ if self.upgrade_state.get('paused'):
+ r.message = 'Upgrade paused'
+ return trivial_result(r)
+
+ def upgrade_start(self, image, version):
+ if version:
+ target_name = self.container_image_base + ':v' + version
+ elif image:
+ target_name = image
+ else:
+ raise OrchestratorError('must specify either image or version')
+ if self.upgrade_state:
+ if self.upgrade_state.get('target_name') != target_name:
+ raise OrchestratorError(
+ 'Upgrade to %s (not %s) already in progress' %
+ (self.upgrade_state.get('target_name'), target_name))
+ if self.upgrade_state.get('paused'):
+ del self.upgrade_state['paused']
+ self._save_upgrade_state()
+ return trivial_result('Resumed upgrade to %s' %
+ self.upgrade_state.get('target_name'))
+ return trivial_result('Upgrade to %s in progress' %
+ self.upgrade_state.get('target_name'))
+ target_id, target_version = self._get_container_image_id(target_name)
+ self.upgrade_state = {
+ 'target_name': target_name,
+ 'target_id': target_id,
+ 'target_version': target_version,
+ }
+ self._save_upgrade_state()
+ self.event.set()
+ return trivial_result('Initiating upgrade to %s %s' % (image, target_id))
+
+ def upgrade_pause(self):
+ if not self.upgrade_state:
+ raise OrchestratorError('No upgrade in progress')
+ if self.upgrade_state.get('paused'):
+ return trivial_result('Upgrade to %s already paused' %
+ self.upgrade_state.get('target_name'))
+ self.upgrade_state['paused'] = True
+ self._save_upgrade_state()
+ return trivial_result('Paused upgrade to %s' %
+ self.upgrade_state.get('target_name'))
+
+ def upgrade_resume(self):
+ if not self.upgrade_state:
+ raise OrchestratorError('No upgrade in progress')
+ if not self.upgrade_state.get('paused'):
+ return trivial_result('Upgrade to %s not paused' %
+ self.upgrade_state.get('target_name'))
+ del self.upgrade_state['paused']
+ self._save_upgrade_state()
+ return trivial_result('Resumed upgrade to %s' %
+ self.upgrade_state.get('target_name'))
+
+ def upgrade_stop(self):
+ if not self.upgrade_state:
+ return trivial_result('No upgrade in progress')
+ target_name = self.upgrade_state.get('target_name')
+ self.upgrade_state = None
+ self._save_upgrade_state()
+ return trivial_result('Stopped upgrade to %s' % target_name)
+
+
class BaseScheduler(object):
"""
Base Scheduler Interface