]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: check agents more frequently
authorAdam King <adking@redhat.com>
Sat, 23 Oct 2021 17:49:02 +0000 (13:49 -0400)
committerAdam King <adking@redhat.com>
Sat, 23 Oct 2021 17:50:45 +0000 (13:50 -0400)
To faster detect agents who are down or whose
deps have changed

Signed-off-by: Adam King <adking@redhat.com>
src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/serve.py

index 12bd0bf6f4aa380fb55eb7378a9f69beeccc6fce..8998c1e63f561821fddc3429621f606fda09da7c 100644 (file)
@@ -17,13 +17,14 @@ from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 
 from datetime import datetime, timedelta
 from OpenSSL import crypto
+from contextlib import contextmanager
 from cryptography import x509
 from cryptography.x509.oid import NameOID
 from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.primitives import hashes, serialization
 from cryptography.hazmat.backends import default_backend
 
-from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING
+from typing import Any, Dict, Iterator, List, Set, Tuple, TYPE_CHECKING
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -72,7 +73,11 @@ class CherryPyThread(threading.Thread):
             self.mgr.log.info('Starting cherrypy engine...')
             cherrypy.engine.start()
             self.mgr.log.info('Cherrypy engine started.')
-            self.mgr._kick_serve_loop()
+            agents_down = []
+            for h in self.mgr.cache.get_hosts():
+                if self.mgr.agent_helpers._check_agent(h):
+                    agents_down.append(h)
+            self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
             # wait for the shutdown event
             self.cherrypy_shutdown_event.wait()
             self.cherrypy_shutdown_event.clear()
@@ -172,7 +177,7 @@ class HostData:
 
             agents_down = []
             for h in self.mgr.cache.get_hosts():
-                if self.mgr.agent_helpers._agent_down(h):
+                if self.mgr.agent_helpers._check_agent(h):
                     agents_down.append(h)
             self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
 
@@ -300,6 +305,7 @@ class AgentMessageThread(threading.Thread):
 class CephadmAgentHelpers:
     def __init__(self, mgr: "CephadmOrchestrator"):
         self.mgr: "CephadmOrchestrator" = mgr
+        self.agent_locks: Dict[str, threading.Lock] = {}
 
     def _request_agent_acks(self, hosts: Set[str], increment: bool = False) -> None:
         for host in hosts:
@@ -386,7 +392,26 @@ class CephadmAgentHelpers:
             self.mgr.cache.agent_ports = {}
         return need_apply
 
+    @contextmanager
+    def agent_lock(self, host: str) -> Iterator[threading.Lock]:
+        if host not in self.mgr.agent_helpers.agent_locks:
+            self.mgr.agent_helpers.agent_locks[host] = threading.Lock()
+        lock = self.mgr.agent_helpers.agent_locks[host]
+        if not lock.acquire(False):
+            raise Exception('Agent lock in use')
+        try:
+            yield lock
+        finally:
+            lock.release()
+
     def _check_agent(self, host: str) -> bool:
+        try:
+            assert self.mgr.cherrypy_thread
+            assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+        except Exception:
+            self.mgr.log.debug(
+                f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
+            return False
         if self.mgr.agent_helpers._agent_down(host):
             if host not in self.mgr.offline_hosts:
                 self.mgr.cache.metadata_up_to_date[host] = False
@@ -398,26 +423,46 @@ class CephadmAgentHelpers:
                     # try to schedule redeploy of agent in case it is individually down
                     agent = [a for a in self.mgr.cache.get_daemons_by_type(
                         'agent') if a.hostname == host][0]
-                    self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
+                    with self.mgr.agent_helpers.agent_lock(host):
+                        daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+                        self.mgr._daemon_action(daemon_spec, action='redeploy')
                 except Exception as e:
                     self.mgr.log.debug(
-                        f'Failed to find entry for agent deployed on host {host}. Agent possibly never deployed: {e}')
+                        f'Failed to redeploy agent on host {host}. Agent possibly never deployed: {e}')
             return True
         else:
             try:
                 agent = [a for a in self.mgr.cache.get_daemons_by_type(
                     'agent') if a.hostname == host][0]
                 assert agent.daemon_id is not None
+                assert agent.hostname is not None
+            except Exception as e:
+                self.mgr.log.debug(
+                    f'Could not retrieve agent on host {host} from daemon cache: {e}')
+                return False
+            try:
                 spec = self.mgr.spec_store.active_specs.get('agent', None)
                 deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
                 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
                     host, agent.name())
                 if not last_config or last_deps != deps:
                     daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
-                    self.mgr._daemon_action(daemon_spec, action='reconfig')
+                    with self.mgr.agent_helpers.agent_lock(host):
+                        self.mgr._daemon_action(daemon_spec, action='reconfig')
+                    return False
             except Exception as e:
                 self.mgr.log.debug(
                     f'Agent on host {host} not ready to have config and deps checked: {e}')
+            action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
+            if action:
+                try:
+                    with self.mgr.agent_helpers.agent_lock(host):
+                        daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+                        self.mgr._daemon_action(daemon_spec, action=action)
+                        self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
+                except Exception as e:
+                    self.mgr.log.debug(
+                        f'Agent on host {host} not ready to {action}: {e}')
             return False
 
 
index 41a5bebcd6f248f82fb60566510a7094ff5891b9..4a721b50fa301bdefae29bfd1cceeae42a4b61cc 100644 (file)
@@ -909,12 +909,11 @@ class CephadmServe:
 
             if dd.daemon_type == 'agent':
                 try:
-                    assert self.mgr.cherrypy_thread
-                    assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+                    self.mgr.agent_helpers._check_agent(dd.hostname)
                 except Exception:
-                    self.log.info(
-                        f'Delaying checking {dd.name()} until cephadm endpoint finished creating root cert')
-                    continue
+                    self.log.debug(
+                        f'Agent {dd.name()} could not be checked in _check_daemons')
+                continue
 
             # These daemon types require additional configs after creation
             if dd.daemon_type in REQUIRES_POST_ACTIONS: