]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: still check agent deps if it is marked down
authorAdam King <adking@redhat.com>
Thu, 6 Jan 2022 22:01:34 +0000 (17:01 -0500)
committerAdam King <adking@redhat.com>
Fri, 28 Jan 2022 16:23:51 +0000 (11:23 -0500)
Right now if an agent is down, the way _check_agent works
if will return without ever going on to check the deps or
scheduled actions for that agent. This causes a few issues.
For one, if an agent is marked down and then a mgr failover
happens, even if reconfiguring the agent would put it in a working
state (e.g. changing the target ip if the active mgr has moved)
we never try it because _check_agent just returns as soon as it
sees the agent is down. Additionally, if someone purposely tried
to schedule a redeploy of a down agent for whatever reason, we
would never make good on this action.

This change allows us to still carry out the normal checks/
scheduled actions even on down agents

Fixes: https://tracker.ceph.com/issues/53723
Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit 09a593c8d56adabb01f2aeb0859e7885994ce687)

src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/inventory.py

index c61eb74d83fa370b0a73c402591433657c1dcea0..7f65190a9754519d8dc508999ca913f04093d03c 100644 (file)
@@ -269,12 +269,13 @@ class HostData:
 
 
 class AgentMessageThread(threading.Thread):
-    def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator") -> None:
+    def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
         self.mgr = mgr
         self.host = host
         self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
         self.port = port
         self.data: str = json.dumps(data)
+        self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec
         super(AgentMessageThread, self).__init__(target=self.run)
 
     def run(self) -> None:
@@ -328,6 +329,8 @@ class AgentMessageThread(threading.Thread):
                 secure_agent_socket.sendall(msg.encode('utf-8'))
                 agent_response = secure_agent_socket.recv(1024).decode()
                 self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
+                if self.daemon_spec:
+                    self.mgr.cache.agent_config_successfully_delivered(self.daemon_spec)
                 self.mgr.cache.sending_agent_message[self.host] = False
                 return
             except ConnectionError as e:
@@ -346,15 +349,11 @@ class AgentMessageThread(threading.Thread):
         return
 
 
-class AgentLockException(Exception):
-    pass
-
-
 class CephadmAgentHelpers:
     def __init__(self, mgr: "CephadmOrchestrator"):
         self.mgr: "CephadmOrchestrator" = mgr
 
-    def _request_agent_acks(self, hosts: Set[str], increment: bool = False, new_config: Optional[Dict[str, str]] = None) -> None:
+    def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
         for host in hosts:
             if increment:
                 self.mgr.cache.metadata_up_to_date[host] = False
@@ -363,10 +362,10 @@ class CephadmAgentHelpers:
             elif increment:
                 self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
             payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]}
-            if new_config:
-                payload['config'] = new_config
+            if daemon_spec:
+                payload['config'] = daemon_spec.final_config
             message_thread = AgentMessageThread(
-                host, self.mgr.cache.agent_ports[host], payload, self.mgr)
+                host, self.mgr.cache.agent_ports[host], payload, self.mgr, daemon_spec)
             message_thread.start()
 
     def _request_ack_all_not_up_to_date(self) -> None:
@@ -387,9 +386,9 @@ class CephadmAgentHelpers:
         # just set the timestamp to now. However, if host was offline before, we
         # should not allow creating a new timestamp to cause it to be marked online
         if host not in self.mgr.cache.agent_timestamp:
-            self.mgr.cache.agent_timestamp[host] = datetime_now()
             if host in self.mgr.offline_hosts:
                 return False
+            self.mgr.cache.agent_timestamp[host] = datetime_now()
         # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
         down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
         time_diff = datetime_now() - self.mgr.cache.agent_timestamp[host]
@@ -451,71 +450,66 @@ class CephadmAgentHelpers:
         return need_apply
 
     def _check_agent(self, host: str) -> bool:
+        down = False
         try:
             assert self.mgr.cherrypy_thread
             assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
         except Exception:
             self.mgr.log.debug(
                 f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
-            return False
+            return down
         if self.mgr.agent_helpers._agent_down(host):
-            return True
-        else:
-            try:
-                agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
-                assert agent.daemon_id is not None
-                assert agent.hostname is not None
-            except Exception as e:
-                self.mgr.log.debug(
-                    f'Could not retrieve agent on host {host} from daemon cache: {e}')
-                return False
+            down = True
+        try:
+            agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
+            assert agent.daemon_id is not None
+            assert agent.hostname is not None
+        except Exception as e:
+            self.mgr.log.debug(
+                f'Could not retrieve agent on host {host} from daemon cache: {e}')
+            return down
+        try:
+            spec = self.mgr.spec_store.active_specs.get('agent', None)
+            deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
+            last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
+                host, agent.name())
+            if not last_config or last_deps != deps:
+                # if root cert is the dep that changed, we must use ssh to reconfig
+                # so it's necessary to check this one specifically
+                root_cert_match = False
+                try:
+                    root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+                    if last_deps and root_cert in last_deps:
+                        root_cert_match = True
+                except Exception:
+                    pass
+                daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+                # we need to know the agent port to try to reconfig w/ http
+                # otherwise there is no choice but a full ssh reconfig
+                if host in self.mgr.cache.agent_ports and root_cert_match and not down:
+                    daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
+                        daemon_spec.daemon_type)].prepare_create(daemon_spec)
+                    self.mgr.agent_helpers._request_agent_acks(
+                        hosts={daemon_spec.host},
+                        increment=True,
+                        daemon_spec=daemon_spec,
+                    )
+                else:
+                    self.mgr._daemon_action(daemon_spec, action='reconfig')
+                return down
+        except Exception as e:
+            self.mgr.log.debug(
+                f'Agent on host {host} not ready to have config and deps checked: {e}')
+        action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
+        if action:
             try:
-                spec = self.mgr.spec_store.active_specs.get('agent', None)
-                deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
-                last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
-                    host, agent.name())
-                if not last_config or last_deps != deps:
-                    # if root cert is the dep that changed, we must use ssh to reconfig
-                    # so it's necessary to check this one specifically
-                    root_cert_match = False
-                    try:
-                        root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
-                        if last_deps and root_cert in last_deps:
-                            root_cert_match = True
-                    except Exception:
-                        pass
-                    daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
-                    # we need to know the agent port to try to reconfig w/ http
-                    # otherwise there is no choice but a full ssh reconfig
-                    if host in self.mgr.cache.agent_ports and root_cert_match:
-                        daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
-                            daemon_spec.daemon_type)].prepare_create(daemon_spec)
-                        self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
-                        self.mgr.cache.agent_counter[daemon_spec.host] = 1
-                        self.mgr.agent_helpers._request_agent_acks(
-                            hosts={daemon_spec.host},
-                            increment=True,
-                            new_config=daemon_spec.final_config
-                        )
-                        self.mgr.cache.update_daemon_config_deps(
-                            daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
-                        self.mgr.cache.save_host(daemon_spec.host)
-                    else:
-                        self.mgr._daemon_action(daemon_spec, action='reconfig')
-                    return False
+                daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+                self.mgr._daemon_action(daemon_spec, action=action)
+                self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
             except Exception as e:
                 self.mgr.log.debug(
-                    f'Agent on host {host} not ready to have config and deps checked: {e}')
-            action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
-            if action:
-                try:
-                    daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
-                    self.mgr._daemon_action(daemon_spec, action=action)
-                    self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
-                except Exception as e:
-                    self.mgr.log.debug(
-                        f'Agent on host {host} not ready to {action}: {e}')
-            return False
+                    f'Agent on host {host} not ready to {action}: {e}')
+        return down
 
 
 class SSLCerts:
index da4673cd88c7fe2c74b92d78b9179c7017661a46..15adc91a55ab4fcad4c40d1f4081f7e07aea83b8 100644 (file)
@@ -12,6 +12,7 @@ from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 
 from .utils import resolve_ip
 from .migrations import queue_migrate_nfs_spec
@@ -1026,6 +1027,15 @@ class HostCache():
             return False
         return True
 
+    def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
+        # agent successfully received new config. Update config/deps
+        assert daemon_spec.service_name == 'agent'
+        self.update_daemon_config_deps(
+            daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
+        self.agent_timestamp[daemon_spec.host] = datetime_now()
+        self.agent_counter[daemon_spec.host] = 1
+        self.save_host(daemon_spec.host)
+
     def add_daemon(self, host, dd):
         # type: (str, orchestrator.DaemonDescription) -> None
         assert host in self.daemons