To be able to detect if certain offline hosts go
offline quicker. Could be useful for the NFS
HA feature as this requires moving nfs daemons from
offline hosts within 90 seconds.
Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit
bd9eb596570cfcc7fea793c2b380bc66dd719439)
Conflicts:
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/ssh.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/utils.py
from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
from .upgrade import CephadmUpgrade
from .template import TemplateMgr
-from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
+from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
+ cephadmNoImage
from .configchecks import CephadmConfigChecks
+from .offline_watcher import OfflineHostWatcher
try:
import remoto
self.config_checker = CephadmConfigChecks(self)
+ self.offline_watcher = OfflineHostWatcher(self)
+ self.offline_watcher.start()
+
def shutdown(self) -> None:
self.log.debug('shutdown')
self._worker_pool.close()
self._worker_pool.join()
+ self.offline_watcher.shutdown()
self.run = False
self.event.set()
conn.exit()
self._cons = {}
+ def update_watched_hosts(self) -> None:
+ # currently, we are watching hosts with nfs daemons
+ hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
+ ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
+ self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
+
def offline_hosts_remove(self, host: str) -> None:
if host in self.offline_hosts:
self.offline_hosts.remove(host)
--- /dev/null
+import logging
+from typing import List, Optional, TYPE_CHECKING
+
+import multiprocessing as mp
+import threading
+
+from cephadm.serve import CephadmServe
+
+try:
+ import remoto
+except ImportError:
+ remoto = None
+
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+logger = logging.getLogger(__name__)
+
+
+class OfflineHostWatcher(threading.Thread):
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+ self.hosts: Optional[List[str]] = None
+ self.new_hosts: Optional[List[str]] = None
+ self.stop = False
+ self.event = threading.Event()
+ super(OfflineHostWatcher, self).__init__(target=self.run)
+
+ def run(self) -> None:
+ self.thread_pool = mp.pool.ThreadPool(10)
+ while not self.stop:
+ # only need to take action if we have hosts to check
+ if self.hosts or self.new_hosts:
+ if self.new_hosts:
+ self.hosts = self.new_hosts
+ self.new_hosts = None
+ logger.debug(f'OfflineHostDetector: Checking if hosts: {self.hosts} are offline.')
+ assert self.hosts is not None
+ self.thread_pool.map(self.check_host, self.hosts)
+ self.event.wait(20)
+ self.event.clear()
+ self.thread_pool.close()
+ self.thread_pool.join()
+
+ def check_host(self, host: str) -> None:
+ if host not in self.mgr.offline_hosts:
+ try:
+ with CephadmServe(self.mgr)._remote_connection(host) as tpl:
+ conn, connr = tpl
+ out, err, code = remoto.process.check(conn, ['true'])
+ except Exception:
+ logger.debug(f'OfflineHostDetector: detected {host} to be offline')
+ # kick serve loop in case corrective action must be taken for offline host
+ self.mgr._kick_serve_loop()
+
+ def set_hosts(self, hosts: List[str]) -> None:
+ hosts.sort()
+ if (not self.hosts or self.hosts != hosts) and hosts:
+ self.new_hosts = hosts
+ logger.debug(
+ f'OfflineHostDetector: Hosts to check if offline swapped to: {self.new_hosts}.')
+ self.wakeup()
+
+ def wakeup(self) -> None:
+ self.event.set()
+
+ def shutdown(self) -> None:
+ self.stop = True
+ self.wakeup()
continue
in_maintenance[h.hostname] = False
unreachable_hosts = [h.hostname for h in self.unreachable_hosts]
- candidates = [c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
+ candidates = [
+ c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
return candidates
f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
len(self.mgr.apply_spec_fails),
warnings)
-
+ self.mgr.update_watched_hosts()
return r
def _apply_service_config(self, spec: ServiceSpec) -> None:
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
- mock.patch("cephadm.module.CephadmOrchestrator.remote"):
+ mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+ mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None: