]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: offline host watcher
authorAdam King <adking@redhat.com>
Fri, 4 Mar 2022 02:47:47 +0000 (21:47 -0500)
committerAdam King <adking@redhat.com>
Tue, 5 Apr 2022 20:10:22 +0000 (16:10 -0400)
To be able to detect if certain offline hosts go
offline quicker. Could be useful for the NFS
HA feature as this requires moving nfs daemons from
offline hosts within 90 seconds.

Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit bd9eb596570cfcc7fea793c2b380bc66dd719439)

Conflicts:
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/ssh.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/utils.py

src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/offline_watcher.py [new file with mode: 0644]
src/pybind/mgr/cephadm/schedule.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/tests/fixtures.py

index 94c42df9abf11e6c4327979c4a429116ef3212ee..607afd51b0dac9c9f1f278253298cbfef9460e6f 100644 (file)
@@ -59,8 +59,10 @@ from .schedule import HostAssignment
 from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
 from .upgrade import CephadmUpgrade
 from .template import TemplateMgr
-from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
+from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
+    cephadmNoImage
 from .configchecks import CephadmConfigChecks
+from .offline_watcher import OfflineHostWatcher
 
 try:
     import remoto
@@ -499,10 +501,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.config_checker = CephadmConfigChecks(self)
 
+        self.offline_watcher = OfflineHostWatcher(self)
+        self.offline_watcher.start()
+
     def shutdown(self) -> None:
         self.log.debug('shutdown')
         self._worker_pool.close()
         self._worker_pool.join()
+        self.offline_watcher.shutdown()
         self.run = False
         self.event.set()
 
@@ -729,6 +735,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             conn.exit()
         self._cons = {}
 
+    def update_watched_hosts(self) -> None:
+        # currently, we are watching hosts with nfs daemons
+        hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
+        ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
+        self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
+
     def offline_hosts_remove(self, host: str) -> None:
         if host in self.offline_hosts:
             self.offline_hosts.remove(host)
diff --git a/src/pybind/mgr/cephadm/offline_watcher.py b/src/pybind/mgr/cephadm/offline_watcher.py
new file mode 100644 (file)
index 0000000..006156f
--- /dev/null
@@ -0,0 +1,70 @@
+import logging
+from typing import List, Optional, TYPE_CHECKING
+
+import multiprocessing as mp
+import threading
+
+from cephadm.serve import CephadmServe
+
+try:
+    import remoto
+except ImportError:
+    remoto = None
+
+
+if TYPE_CHECKING:
+    from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+
+class OfflineHostWatcher(threading.Thread):
+    def __init__(self, mgr: "CephadmOrchestrator") -> None:
+        self.mgr = mgr
+        self.hosts: Optional[List[str]] = None
+        self.new_hosts: Optional[List[str]] = None
+        self.stop = False
+        self.event = threading.Event()
+        super(OfflineHostWatcher, self).__init__(target=self.run)
+
+    def run(self) -> None:
+        self.thread_pool = mp.pool.ThreadPool(10)
+        while not self.stop:
+            # only need to take action if we have hosts to check
+            if self.hosts or self.new_hosts:
+                if self.new_hosts:
+                    self.hosts = self.new_hosts
+                    self.new_hosts = None
+                logger.debug(f'OfflineHostDetector: Checking if hosts: {self.hosts} are offline.')
+                assert self.hosts is not None
+                self.thread_pool.map(self.check_host, self.hosts)
+            self.event.wait(20)
+            self.event.clear()
+        self.thread_pool.close()
+        self.thread_pool.join()
+
+    def check_host(self, host: str) -> None:
+        if host not in self.mgr.offline_hosts:
+            try:
+                with CephadmServe(self.mgr)._remote_connection(host) as tpl:
+                    conn, connr = tpl
+                    out, err, code = remoto.process.check(conn, ['true'])
+            except Exception:
+                logger.debug(f'OfflineHostDetector: detected {host} to be offline')
+                # kick serve loop in case corrective action must be taken for offline host
+                self.mgr._kick_serve_loop()
+
+    def set_hosts(self, hosts: List[str]) -> None:
+        hosts.sort()
+        if (not self.hosts or self.hosts != hosts) and hosts:
+            self.new_hosts = hosts
+            logger.debug(
+                f'OfflineHostDetector: Hosts to check if offline swapped to: {self.new_hosts}.')
+            self.wakeup()
+
+    def wakeup(self) -> None:
+        self.event.set()
+
+    def shutdown(self) -> None:
+        self.stop = True
+        self.wakeup()
index 9a8bad3c906292861bf515758c1a444e424c3f96..612c558043c8f59e600543c0a418f5c77915f547 100644 (file)
@@ -447,5 +447,6 @@ class HostAssignment(object):
                 continue
             in_maintenance[h.hostname] = False
         unreachable_hosts = [h.hostname for h in self.unreachable_hosts]
-        candidates = [c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
+        candidates = [
+            c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
         return candidates
index 939b50bd30b9abdff01e46296b0bd0611937efe3..34ae333d6678ed1d6b94a10a200ab0d734db7649 100644 (file)
@@ -518,7 +518,7 @@ class CephadmServe:
                                             f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
                                             len(self.mgr.apply_spec_fails),
                                             warnings)
-
+        self.mgr.update_watched_hosts()
         return r
 
     def _apply_service_config(self, spec: ServiceSpec) -> None:
index acacfb8ab4337395d1153d2d40b8827a6f38d291..40a8ad6360ca828e6ee49e57cdd6ab6990407194 100644 (file)
@@ -42,7 +42,8 @@ def with_cephadm_module(module_options=None, store=None):
     with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
             mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
             mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
-            mock.patch("cephadm.module.CephadmOrchestrator.remote"):
+            mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+            mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
 
         m = CephadmOrchestrator.__new__(CephadmOrchestrator)
         if module_options is not None: