ClientKeyringStore, ClientKeyringSpec
from .upgrade import CephadmUpgrade
from .template import TemplateMgr
-from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
+from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
+ cephadmNoImage
from .configchecks import CephadmConfigChecks
+from .offline_watcher import OfflineHostWatcher
try:
import asyncssh
if self.use_agent:
self.agent_helpers._apply_agent()
+ self.offline_watcher = OfflineHostWatcher(self)
+ self.offline_watcher.start()
+
def shutdown(self) -> None:
self.log.debug('shutdown')
self._worker_pool.close()
self._worker_pool.join()
self.cherrypy_thread.shutdown()
+ self.offline_watcher.shutdown()
self.run = False
self.event.set()
self.cache.save_host(host)
return None
+ def update_watched_hosts(self) -> None:
+ # currently, we are watching hosts with nfs daemons
+ hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
+ ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
+ self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
+
def offline_hosts_remove(self, host: str) -> None:
if host in self.offline_hosts:
self.offline_hosts.remove(host)
--- /dev/null
+import logging
+from typing import List, Optional, TYPE_CHECKING
+
+import multiprocessing as mp
+import threading
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+
+class OfflineHostWatcher(threading.Thread):
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+ self.hosts: Optional[List[str]] = None
+ self.new_hosts: Optional[List[str]] = None
+ self.stop = False
+ self.event = threading.Event()
+ super(OfflineHostWatcher, self).__init__(target=self.run)
+
+ def run(self) -> None:
+ self.thread_pool = mp.pool.ThreadPool(10)
+ while not self.stop:
+ # only need to take action if we have hosts to check
+ if self.hosts or self.new_hosts:
+ if self.new_hosts:
+ self.hosts = self.new_hosts
+ self.new_hosts = None
+ logger.debug(f'OfflineHostDetector: Checking if hosts: {self.hosts} are offline.')
+ assert self.hosts is not None
+ self.thread_pool.map(self.check_host, self.hosts)
+ self.event.wait(20)
+ self.event.clear()
+ self.thread_pool.close()
+ self.thread_pool.join()
+
+ def check_host(self, host: str) -> None:
+ if host not in self.mgr.offline_hosts:
+ try:
+ self.mgr.ssh.check_execute_command(host, ['true'])
+ except Exception:
+ logger.debug(f'OfflineHostDetector: detected {host} to be offline')
+ # kick serve loop in case corrective action must be taken for offline host
+ self.mgr._kick_serve_loop()
+
+ def set_hosts(self, hosts: List[str]) -> None:
+ hosts.sort()
+ if (not self.hosts or self.hosts != hosts) and hosts:
+ self.new_hosts = hosts
+ logger.debug(
+ f'OfflineHostDetector: Hosts to check if offline swapped to: {self.new_hosts}.')
+ self.wakeup()
+
+ def wakeup(self) -> None:
+ self.event.set()
+
+ def shutdown(self) -> None:
+ self.stop = True
+ self.wakeup()
continue
in_maintenance[h.hostname] = False
unreachable_hosts = [h.hostname for h in self.unreachable_hosts]
- candidates = [c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
+ candidates = [
+ c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
return candidates
f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
len(self.mgr.apply_spec_fails),
warnings)
-
+ self.mgr.update_watched_hosts()
return r
def _apply_service_config(self, spec: ServiceSpec) -> None:
with self.redirect_log(host, addr):
try:
- ssh_options = asyncssh.SSHClientConnectionOptions(keepalive_interval=7, keepalive_count_max=3)
+ ssh_options = asyncssh.SSHClientConnectionOptions(
+ keepalive_interval=7, keepalive_count_max=3)
conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name],
known_hosts=None, config=[self.mgr.ssh_config_fname],
preferred_auth=['publickey'], options=ssh_options)
mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
- mock.patch('cephadm.agent.CherryPyThread.run'):
+ mock.patch('cephadm.agent.CherryPyThread.run'), \
+ mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None:
# NOTE: order important here as these are used for upgrade order
CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror']
GATEWAY_TYPES = ['iscsi', 'nfs']
-MONITORING_STACK_TYPES = ['node-exporter', 'prometheus', 'alertmanager', 'grafana', 'loki', 'promtail']
+MONITORING_STACK_TYPES = ['node-exporter', 'prometheus',
+ 'alertmanager', 'grafana', 'loki', 'promtail']
RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['nfs']
CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES