]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: offline host watcher 45248/head
authorAdam King <adking@redhat.com>
Fri, 4 Mar 2022 02:47:47 +0000 (21:47 -0500)
committerAdam King <adking@redhat.com>
Sat, 26 Mar 2022 01:21:59 +0000 (21:21 -0400)
To be able to detect if certain offline hosts go
offline quicker. Could be useful for the NFS
HA feature as this requires moving nfs daemons from
offline hosts within 90 seconds.

Signed-off-by: Adam King <adking@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/offline_watcher.py [new file with mode: 0644]
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/ssh.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/utils.py

index 96aee393aa34e1706f7f70c4e390ea1ee43865cb..d7747144a68d624f73e20cc0a6725d47679ca65a 100644 (file)
@@ -58,8 +58,10 @@ from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore,
     ClientKeyringStore, ClientKeyringSpec
 from .upgrade import CephadmUpgrade
 from .template import TemplateMgr
-from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
+from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
+    cephadmNoImage
 from .configchecks import CephadmConfigChecks
+from .offline_watcher import OfflineHostWatcher
 
 try:
     import asyncssh
@@ -544,11 +546,15 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         if self.use_agent:
             self.agent_helpers._apply_agent()
 
+        self.offline_watcher = OfflineHostWatcher(self)
+        self.offline_watcher.start()
+
     def shutdown(self) -> None:
         self.log.debug('shutdown')
         self._worker_pool.close()
         self._worker_pool.join()
         self.cherrypy_thread.shutdown()
+        self.offline_watcher.shutdown()
         self.run = False
         self.event.set()
 
@@ -774,6 +780,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.cache.save_host(host)
         return None
 
+    def update_watched_hosts(self) -> None:
+        # currently, we are watching hosts with nfs daemons
+        hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
+        ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
+        self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
+
     def offline_hosts_remove(self, host: str) -> None:
         if host in self.offline_hosts:
             self.offline_hosts.remove(host)
diff --git a/src/pybind/mgr/cephadm/offline_watcher.py b/src/pybind/mgr/cephadm/offline_watcher.py
new file mode 100644 (file)
index 0000000..b80f510
--- /dev/null
@@ -0,0 +1,60 @@
+import logging
+from typing import List, Optional, TYPE_CHECKING
+
+import multiprocessing as mp
+import threading
+
+if TYPE_CHECKING:
+    from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+
+class OfflineHostWatcher(threading.Thread):
+    def __init__(self, mgr: "CephadmOrchestrator") -> None:
+        self.mgr = mgr
+        self.hosts: Optional[List[str]] = None
+        self.new_hosts: Optional[List[str]] = None
+        self.stop = False
+        self.event = threading.Event()
+        super(OfflineHostWatcher, self).__init__(target=self.run)
+
+    def run(self) -> None:
+        self.thread_pool = mp.pool.ThreadPool(10)
+        while not self.stop:
+            # only need to take action if we have hosts to check
+            if self.hosts or self.new_hosts:
+                if self.new_hosts:
+                    self.hosts = self.new_hosts
+                    self.new_hosts = None
+                logger.debug(f'OfflineHostDetector: Checking if hosts: {self.hosts} are offline.')
+                assert self.hosts is not None
+                self.thread_pool.map(self.check_host, self.hosts)
+            self.event.wait(20)
+            self.event.clear()
+        self.thread_pool.close()
+        self.thread_pool.join()
+
+    def check_host(self, host: str) -> None:
+        if host not in self.mgr.offline_hosts:
+            try:
+                self.mgr.ssh.check_execute_command(host, ['true'])
+            except Exception:
+                logger.debug(f'OfflineHostDetector: detected {host} to be offline')
+                # kick serve loop in case corrective action must be taken for offline host
+                self.mgr._kick_serve_loop()
+
+    def set_hosts(self, hosts: List[str]) -> None:
+        hosts.sort()
+        if (not self.hosts or self.hosts != hosts) and hosts:
+            self.new_hosts = hosts
+            logger.debug(
+                f'OfflineHostDetector: Hosts to check if offline swapped to: {self.new_hosts}.')
+            self.wakeup()
+
+    def wakeup(self) -> None:
+        self.event.set()
+
+    def shutdown(self) -> None:
+        self.stop = True
+        self.wakeup()
index 9a8bad3c906292861bf515758c1a444e424c3f96..612c558043c8f59e600543c0a418f5c77915f547 100644 (file)
@@ -447,5 +447,6 @@ class HostAssignment(object):
                 continue
             in_maintenance[h.hostname] = False
         unreachable_hosts = [h.hostname for h in self.unreachable_hosts]
-        candidates = [c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
+        candidates = [
+            c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
         return candidates
index 57f2fff493fb2175ab2ecbc1aaadbd5eb56f63d1..4c031f3b50c90ad942616048e3fe4a61a11bf9bc 100644 (file)
@@ -513,7 +513,7 @@ class CephadmServe:
                                             f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
                                             len(self.mgr.apply_spec_fails),
                                             warnings)
-
+        self.mgr.update_watched_hosts()
         return r
 
     def _apply_service_config(self, spec: ServiceSpec) -> None:
index 6ef71a943e863a9f4bfcd0c219e080282df6835e..5363ac058878d1cf7afd5e0fe411b666de2bb5ca 100644 (file)
@@ -75,7 +75,8 @@ class SSHManager:
 
             with self.redirect_log(host, addr):
                 try:
-                    ssh_options = asyncssh.SSHClientConnectionOptions(keepalive_interval=7, keepalive_count_max=3)
+                    ssh_options = asyncssh.SSHClientConnectionOptions(
+                        keepalive_interval=7, keepalive_count_max=3)
                     conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name],
                                                   known_hosts=None, config=[self.mgr.ssh_config_fname],
                                                   preferred_auth=['publickey'], options=ssh_options)
index 91c399b539c499b0c56719eeb40d1f9753943fe1..7a4ac0d873cd734f999df606ac4ac610b0db37fe 100644 (file)
@@ -98,7 +98,8 @@ def with_cephadm_module(module_options=None, store=None):
             mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
             mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
             mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
-            mock.patch('cephadm.agent.CherryPyThread.run'):
+            mock.patch('cephadm.agent.CherryPyThread.run'), \
+            mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
 
         m = CephadmOrchestrator.__new__(CephadmOrchestrator)
         if module_options is not None:
index 3a5b564d59e830f57bdfcf5a01458c7a99fd2a19..28811fc3aceda1eff16e3891ed6e59d0678f127d 100644 (file)
@@ -23,7 +23,8 @@ class CephadmNoImage(Enum):
 # NOTE: order important here as these are used for upgrade order
 CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror']
 GATEWAY_TYPES = ['iscsi', 'nfs']
-MONITORING_STACK_TYPES = ['node-exporter', 'prometheus', 'alertmanager', 'grafana', 'loki', 'promtail']
+MONITORING_STACK_TYPES = ['node-exporter', 'prometheus',
+                          'alertmanager', 'grafana', 'loki', 'promtail']
 RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['nfs']
 
 CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES