self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
self.registry_login_queue: Set[str] = set()
+ self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
+
def load(self):
# type: () -> None
for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
self.last_etc_ceph_ceph_conf[host] = datetime.datetime.strptime(
j['last_etc_ceph_ceph_conf'], DATEFMT)
self.registry_login_queue.add(host)
+ self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
+
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
'%d devices, %d networks' % (
def distribute_new_registry_login_info(self):
self.registry_login_queue = set(self.mgr.inventory.keys())
- def save_host(self, host):
- # type: (str) -> None
- j = { # type: ignore
+ def save_host(self, host: str) -> None:
+ j: Dict[str, Any] = {
'daemons': {},
'devices': [],
'osdspec_previews': [],
}
if host in self.last_daemon_update:
j['last_daemon_update'] = self.last_daemon_update[host].strftime(
- DATEFMT) # type: ignore
+ DATEFMT)
if host in self.last_device_update:
j['last_device_update'] = self.last_device_update[host].strftime(
- DATEFMT) # type: ignore
+ DATEFMT)
for name, dd in self.daemons[host].items():
- j['daemons'][name] = dd.to_json() # type: ignore
+ j['daemons'][name] = dd.to_json()
for d in self.devices[host]:
- j['devices'].append(d.to_json()) # type: ignore
+ j['devices'].append(d.to_json())
j['networks'] = self.networks[host]
for name, depi in self.daemon_config_deps[host].items():
- j['daemon_config_deps'][name] = { # type: ignore
+ j['daemon_config_deps'][name] = {
'deps': depi.get('deps', []),
'last_config': depi['last_config'].strftime(DATEFMT),
}
if host in self.last_etc_ceph_ceph_conf:
j['last_etc_ceph_ceph_conf'] = self.last_etc_ceph_ceph_conf[host].strftime(DATEFMT)
+ if self.scheduled_daemon_actions.get(host, {}):
+ j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
del self.last_device_update[host]
if host in self.daemon_config_deps:
del self.daemon_config_deps[host]
+ if host in self.scheduled_daemon_actions:
+ del self.scheduled_daemon_actions[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
def get_hosts(self):
return
self.last_etc_ceph_ceph_conf[host] = self.mgr.last_monmap
- def host_needs_registry_login(self, host):
+ def host_needs_registry_login(self, host: str) -> bool:
if host in self.mgr.offline_hosts:
return False
if host in self.registry_login_queue:
return all((h in self.last_daemon_update or h in self.mgr.offline_hosts)
for h in self.get_hosts())
+ def schedule_daemon_action(self, host: str, daemon_name: str, action: str):
+ priorities = {
+ 'start': 1,
+ 'restart': 2,
+ 'reconfig': 3,
+ 'redeploy': 4,
+ 'stop': 5,
+ }
+ existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
+ if existing_action and priorities[existing_action] > priorities[action]:
+ logger.debug(
+ f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
+ return
+
+ if host not in self.scheduled_daemon_actions:
+ self.scheduled_daemon_actions[host] = {}
+ self.scheduled_daemon_actions[host][daemon_name] = action
+
+ def rm_scheduled_daemon_action(self, host: str, daemon_name: str):
+ if host in self.scheduled_daemon_actions:
+ if daemon_name in self.scheduled_daemon_actions[host]:
+ del self.scheduled_daemon_actions[host][daemon_name]
+ if not self.scheduled_daemon_actions[host]:
+ del self.scheduled_daemon_actions[host]
+
+ def get_scheduled_daemon_action(self, host, daemon) -> Optional[str]:
+ return self.scheduled_daemon_actions.get(host, {}).get(daemon)
+
class EventStore():
def __init__(self, mgr):