]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cephadm: gracefully shutdown the agent prior to removing
authorGuillaume Abrioux <gabrioux@ibm.com>
Fri, 1 Dec 2023 08:03:58 +0000 (08:03 +0000)
committerGuillaume Abrioux <gabrioux@ibm.com>
Thu, 25 Jan 2024 15:16:26 +0000 (15:16 +0000)
When the agent is removed, the daemon is abruptly stopped.
Since the node-proxy logic runs from within the cephadm agent,
it leaves an active RedFish session. The idea is to gracefully
shutdown the agent so node-proxy can catch that event and make sure
it closes the current active RedFish session prior to shutting down.

Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
(cherry picked from commit 79bfe642001a7f9e1da28f987d1edb45174f6e86)

src/cephadm/cephadm.py
src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/services/cephadmservice.py

index e42647ebd8cf2d3420a2e18d3d6182f4bbcbc079..5c3084acd473d072cf09ae51bdca847f4972894c 100755 (executable)
@@ -4719,12 +4719,13 @@ class MgrListener(Thread):
                         conn.send(err_str.encode())
                         logger.error(err_str)
                     else:
-                        conn.send(b'ACK')
-                        if 'config' in data:
-                            self.agent.wakeup()
-                        self.agent.ls_gatherer.wakeup()
-                        self.agent.volume_gatherer.wakeup()
-                        logger.debug(f'Got mgr message {data}')
+                        if 'counter' in data:
+                            conn.send(b'ACK')
+                            if 'config' in data:
+                                self.agent.wakeup()
+                            self.agent.ls_gatherer.wakeup()
+                            self.agent.volume_gatherer.wakeup()
+                            logger.debug(f'Got mgr message {data}')
             except Exception as e:
                 logger.error(f'Mgr Listener encountered exception: {e}')
 
@@ -4732,17 +4733,23 @@ class MgrListener(Thread):
         self.stop = True
 
     def handle_json_payload(self, data: Dict[Any, Any]) -> None:
-        self.agent.ack = int(data['counter'])
-        if 'config' in data:
-            logger.info('Received new config from mgr')
-            config = data['config']
-            for filename in config:
-                if filename in self.agent.required_files:
-                    file_path = os.path.join(self.agent.daemon_dir, filename)
-                    with write_new(file_path) as f:
-                        f.write(config[filename])
-            self.agent.pull_conf_settings()
-            self.agent.wakeup()
+        if 'counter' in data:
+            self.agent.ack = int(data['counter'])
+            if 'config' in data:
+                logger.info('Received new config from mgr')
+                config = data['config']
+                for filename in config:
+                    if filename in self.agent.required_files:
+                        file_path = os.path.join(self.agent.daemon_dir, filename)
+                        with write_new(file_path) as f:
+                            f.write(config[filename])
+                self.agent.pull_conf_settings()
+                self.agent.wakeup()
+        elif 'node_proxy_shutdown' in data:
+            self.agent.shutdown()
+        else:
+            raise RuntimeError('No valid data received.')
+
 
 
 class CephadmAgent():
index d5f7d3161cf9f9c85dbb38860728627dc68c350e..03266a6c7a75f462e75f521caf606005b426bdaa 100644 (file)
@@ -896,6 +896,16 @@ class CephadmAgentHelpers:
                 host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
             message_thread.start()
 
+    def _shutdown_node_proxy(self) -> None:
+        hosts = set([h for h in self.mgr.cache.get_hosts() if
+                     (h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))])
+
+        for host in hosts:
+            payload: Dict[str, Any] = {'node_proxy_shutdown': host}
+            message_thread = AgentMessageThread(
+                host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr)
+            message_thread.start()
+
     def _request_ack_all_not_up_to_date(self) -> None:
         self.mgr.agent_helpers._request_agent_acks(
             set([h for h in self.mgr.cache.get_hosts() if
@@ -971,10 +981,11 @@ class CephadmAgentHelpers:
             if 'agent' in self.mgr.spec_store:
                 self.mgr.spec_store.rm('agent')
                 need_apply = True
-            self.mgr.agent_cache.agent_counter = {}
-            self.mgr.agent_cache.agent_timestamp = {}
-            self.mgr.agent_cache.agent_keys = {}
-            self.mgr.agent_cache.agent_ports = {}
+            if not self.mgr.cache.get_daemons_by_service('agent'):
+                self.mgr.agent_cache.agent_counter = {}
+                self.mgr.agent_cache.agent_timestamp = {}
+                self.mgr.agent_cache.agent_keys = {}
+                self.mgr.agent_cache.agent_ports = {}
         return need_apply
 
     def _check_agent(self, host: str) -> bool:
index 7d7a04dad9d9c3aa2ed4490abe92aeba551e4bdc..d91a3a3f22d3fd3c112adb7dab87b08a49b04a02 100644 (file)
@@ -1223,6 +1223,16 @@ class CephadmAgent(CephService):
 
         return daemon_spec
 
+    def pre_remove(self, daemon: DaemonDescription) -> None:
+        super().pre_remove(daemon)
+
+        assert daemon.daemon_id is not None
+        daemon_id: str = daemon.daemon_id
+
+        logger.info('Removing agent %s...' % daemon_id)
+
+        self.mgr.agent_helpers._shutdown_node_proxy()
+
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         agent = self.mgr.http_server.agent
         try: