class AgentMessageThread(threading.Thread):
- def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator") -> None:
+ def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
self.mgr = mgr
self.host = host
self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
self.port = port
self.data: str = json.dumps(data)
+ self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec
super(AgentMessageThread, self).__init__(target=self.run)
def run(self) -> None:
secure_agent_socket.sendall(msg.encode('utf-8'))
agent_response = secure_agent_socket.recv(1024).decode()
self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
+ if self.daemon_spec:
+ self.mgr.cache.agent_config_successfully_delivered(self.daemon_spec)
self.mgr.cache.sending_agent_message[self.host] = False
return
except ConnectionError as e:
return
-class AgentLockException(Exception):
- pass
-
-
class CephadmAgentHelpers:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
- def _request_agent_acks(self, hosts: Set[str], increment: bool = False, new_config: Optional[Dict[str, str]] = None) -> None:
+ def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
for host in hosts:
if increment:
self.mgr.cache.metadata_up_to_date[host] = False
elif increment:
self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]}
- if new_config:
- payload['config'] = new_config
+ if daemon_spec:
+ payload['config'] = daemon_spec.final_config
message_thread = AgentMessageThread(
- host, self.mgr.cache.agent_ports[host], payload, self.mgr)
+ host, self.mgr.cache.agent_ports[host], payload, self.mgr, daemon_spec)
message_thread.start()
def _request_ack_all_not_up_to_date(self) -> None:
# just set the timestamp to now. However, if host was offline before, we
# should not allow creating a new timestamp to cause it to be marked online
if host not in self.mgr.cache.agent_timestamp:
- self.mgr.cache.agent_timestamp[host] = datetime_now()
if host in self.mgr.offline_hosts:
return False
+ self.mgr.cache.agent_timestamp[host] = datetime_now()
# agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
time_diff = datetime_now() - self.mgr.cache.agent_timestamp[host]
return need_apply
def _check_agent(self, host: str) -> bool:
+ down = False
try:
assert self.mgr.cherrypy_thread
assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
except Exception:
self.mgr.log.debug(
f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
- return False
+ return down
if self.mgr.agent_helpers._agent_down(host):
- return True
- else:
- try:
- agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
- assert agent.daemon_id is not None
- assert agent.hostname is not None
- except Exception as e:
- self.mgr.log.debug(
- f'Could not retrieve agent on host {host} from daemon cache: {e}')
- return False
+ down = True
+ try:
+ agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
+ assert agent.daemon_id is not None
+ assert agent.hostname is not None
+ except Exception as e:
+ self.mgr.log.debug(
+ f'Could not retrieve agent on host {host} from daemon cache: {e}')
+ return down
+ try:
+ spec = self.mgr.spec_store.active_specs.get('agent', None)
+ deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
+ last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
+ host, agent.name())
+ if not last_config or last_deps != deps:
+ # if root cert is the dep that changed, we must use ssh to reconfig
+ # so it's necessary to check this one specifically
+ root_cert_match = False
+ try:
+ root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ if last_deps and root_cert in last_deps:
+ root_cert_match = True
+ except Exception:
+ pass
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+ # we need to know the agent port to try to reconfig w/ http
+ # otherwise there is no choice but a full ssh reconfig
+ if host in self.mgr.cache.agent_ports and root_cert_match and not down:
+ daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
+ daemon_spec.daemon_type)].prepare_create(daemon_spec)
+ self.mgr.agent_helpers._request_agent_acks(
+ hosts={daemon_spec.host},
+ increment=True,
+ daemon_spec=daemon_spec,
+ )
+ else:
+ self.mgr._daemon_action(daemon_spec, action='reconfig')
+ return down
+ except Exception as e:
+ self.mgr.log.debug(
+ f'Agent on host {host} not ready to have config and deps checked: {e}')
+ action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
+ if action:
try:
- spec = self.mgr.spec_store.active_specs.get('agent', None)
- deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
- last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
- host, agent.name())
- if not last_config or last_deps != deps:
- # if root cert is the dep that changed, we must use ssh to reconfig
- # so it's necessary to check this one specifically
- root_cert_match = False
- try:
- root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
- if last_deps and root_cert in last_deps:
- root_cert_match = True
- except Exception:
- pass
- daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
- # we need to know the agent port to try to reconfig w/ http
- # otherwise there is no choice but a full ssh reconfig
- if host in self.mgr.cache.agent_ports and root_cert_match:
- daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
- daemon_spec.daemon_type)].prepare_create(daemon_spec)
- self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
- self.mgr.cache.agent_counter[daemon_spec.host] = 1
- self.mgr.agent_helpers._request_agent_acks(
- hosts={daemon_spec.host},
- increment=True,
- new_config=daemon_spec.final_config
- )
- self.mgr.cache.update_daemon_config_deps(
- daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
- self.mgr.cache.save_host(daemon_spec.host)
- else:
- self.mgr._daemon_action(daemon_spec, action='reconfig')
- return False
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+ self.mgr._daemon_action(daemon_spec, action=action)
+ self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
except Exception as e:
self.mgr.log.debug(
- f'Agent on host {host} not ready to have config and deps checked: {e}')
- action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
- if action:
- try:
- daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
- self.mgr._daemon_action(daemon_spec, action=action)
- self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
- except Exception as e:
- self.mgr.log.debug(
- f'Agent on host {host} not ready to {action}: {e}')
- return False
+ f'Agent on host {host} not ready to {action}: {e}')
+ return down
class SSLCerts: