When the agent is removed, the daemon is abruptly stopped.
Since the node-proxy logic runs from within the cephadm agent,
it leaves an active RedFish session. The idea is to gracefully
shutdown the agent so node-proxy can catch that event and make sure
it closes the current active RedFish session prior to shutting down.
Signed-off-by: Guillaume Abrioux <gabrioux@ibm.com>
(cherry picked from commit
79bfe642001a7f9e1da28f987d1edb45174f6e86)
conn.send(err_str.encode())
logger.error(err_str)
else:
- conn.send(b'ACK')
- if 'config' in data:
- self.agent.wakeup()
- self.agent.ls_gatherer.wakeup()
- self.agent.volume_gatherer.wakeup()
- logger.debug(f'Got mgr message {data}')
+ if 'counter' in data:
+ conn.send(b'ACK')
+ if 'config' in data:
+ self.agent.wakeup()
+ self.agent.ls_gatherer.wakeup()
+ self.agent.volume_gatherer.wakeup()
+ logger.debug(f'Got mgr message {data}')
except Exception as e:
logger.error(f'Mgr Listener encountered exception: {e}')
self.stop = True
def handle_json_payload(self, data: Dict[Any, Any]) -> None:
- self.agent.ack = int(data['counter'])
- if 'config' in data:
- logger.info('Received new config from mgr')
- config = data['config']
- for filename in config:
- if filename in self.agent.required_files:
- file_path = os.path.join(self.agent.daemon_dir, filename)
- with write_new(file_path) as f:
- f.write(config[filename])
- self.agent.pull_conf_settings()
- self.agent.wakeup()
+ if 'counter' in data:
+ self.agent.ack = int(data['counter'])
+ if 'config' in data:
+ logger.info('Received new config from mgr')
+ config = data['config']
+ for filename in config:
+ if filename in self.agent.required_files:
+ file_path = os.path.join(self.agent.daemon_dir, filename)
+ with write_new(file_path) as f:
+ f.write(config[filename])
+ self.agent.pull_conf_settings()
+ self.agent.wakeup()
+ elif 'node_proxy_shutdown' in data:
+ self.agent.shutdown()
+ else:
+ raise RuntimeError('No valid data received.')
+
class CephadmAgent():
host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
message_thread.start()
+ def _shutdown_node_proxy(self) -> None:
+ hosts = set([h for h in self.mgr.cache.get_hosts() if
+ (h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))])
+
+ for host in hosts:
+ payload: Dict[str, Any] = {'node_proxy_shutdown': host}
+ message_thread = AgentMessageThread(
+ host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr)
+ message_thread.start()
+
def _request_ack_all_not_up_to_date(self) -> None:
self.mgr.agent_helpers._request_agent_acks(
set([h for h in self.mgr.cache.get_hosts() if
if 'agent' in self.mgr.spec_store:
self.mgr.spec_store.rm('agent')
need_apply = True
- self.mgr.agent_cache.agent_counter = {}
- self.mgr.agent_cache.agent_timestamp = {}
- self.mgr.agent_cache.agent_keys = {}
- self.mgr.agent_cache.agent_ports = {}
+ if not self.mgr.cache.get_daemons_by_service('agent'):
+ self.mgr.agent_cache.agent_counter = {}
+ self.mgr.agent_cache.agent_timestamp = {}
+ self.mgr.agent_cache.agent_keys = {}
+ self.mgr.agent_cache.agent_ports = {}
return need_apply
def _check_agent(self, host: str) -> bool:
return daemon_spec
+ def pre_remove(self, daemon: DaemonDescription) -> None:
+ super().pre_remove(daemon)
+
+ assert daemon.daemon_id is not None
+ daemon_id: str = daemon.daemon_id
+
+ logger.info('Removing agent %s...' % daemon_id)
+
+ self.mgr.agent_helpers._shutdown_node_proxy()
+
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
agent = self.mgr.http_server.agent
try: